Skip to main content

laminar_connectors/
registry.rs

1//! Registry of connector factories, keyed by connector type string.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use arrow_schema::SchemaRef;
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::config::{ConnectorConfig, ConnectorInfo};
11use crate::connector::{SinkConnector, SourceConnector};
12use crate::error::ConnectorError;
13use crate::reference::ReferenceTableSource;
14use crate::serde::{self, Format, RecordDeserializer, RecordSerializer};
15
16/// Factory function type for creating source connectors.
17///
18/// The optional `&prometheus::Registry` allows connectors to register
19/// their metrics on the shared Prometheus registry when one is available.
20pub type SourceFactory =
21    Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SourceConnector> + Send + Sync>;
22
23/// Factory function type for creating sink connectors.
24///
25/// The optional `&prometheus::Registry` allows connectors to register
26/// their metrics on the shared Prometheus registry when one is available.
27pub type SinkFactory =
28    Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SinkConnector> + Send + Sync>;
29
30/// Factory function type for creating reference table sources.
31pub type TableSourceFactory = Arc<
32    dyn Fn(&ConnectorConfig) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> + Send + Sync,
33>;
34
35/// Factory for constructing a lookup source (async, for on-demand mode).
36///
37/// Previously this was a hand-rolled `Arc<dyn Fn(...) -> Pin<Box<Future>>>`
38/// type alias that nobody could read. A trait with an `async` method
39/// says the same thing without forcing the caller to spell out the
40/// `Pin<Box<...>>`.
41#[async_trait]
42pub trait LookupSourceFactory: Send + Sync {
43    /// Build a lookup source instance from the given config.
44    async fn build(
45        &self,
46        config: ConnectorConfig,
47    ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>;
48}
49
50/// Registry of available connector implementations. Connectors register
51/// a factory per type string; the runtime looks up by the `connector`
52/// property in `CREATE SOURCE/SINK` DDL.
53#[derive(Clone)]
54pub struct ConnectorRegistry {
55    sources: Arc<RwLock<HashMap<String, (ConnectorInfo, SourceFactory)>>>,
56    sinks: Arc<RwLock<HashMap<String, (ConnectorInfo, SinkFactory)>>>,
57    table_sources: Arc<RwLock<HashMap<String, (ConnectorInfo, TableSourceFactory)>>>,
58    lookup_sources: Arc<RwLock<HashMap<String, Arc<dyn LookupSourceFactory>>>>,
59}
60
61impl ConnectorRegistry {
62    /// Creates a new empty registry.
63    #[must_use]
64    pub fn new() -> Self {
65        Self {
66            sources: Arc::new(RwLock::new(HashMap::new())),
67            sinks: Arc::new(RwLock::new(HashMap::new())),
68            table_sources: Arc::new(RwLock::new(HashMap::new())),
69            lookup_sources: Arc::new(RwLock::new(HashMap::new())),
70        }
71    }
72
73    /// Registers a source connector factory.
74    pub fn register_source(
75        &self,
76        name: impl Into<String>,
77        info: ConnectorInfo,
78        factory: SourceFactory,
79    ) {
80        self.sources.write().insert(name.into(), (info, factory));
81    }
82
83    /// Registers a sink connector factory.
84    pub fn register_sink(
85        &self,
86        name: impl Into<String>,
87        info: ConnectorInfo,
88        factory: SinkFactory,
89    ) {
90        self.sinks.write().insert(name.into(), (info, factory));
91    }
92
93    /// Run a connector's `discover_schema` against the given
94    /// properties and return the resulting Arrow schema. `None` means
95    /// the connector type is unknown or discovery produced no fields.
96    pub async fn default_source_schema(
97        &self,
98        connector_type: &str,
99        properties: &std::collections::HashMap<String, String>,
100    ) -> Option<SchemaRef> {
101        // Release the read lock before awaiting — `discover_schema` may
102        // do network I/O.
103        let factory = {
104            let sources = self.sources.read();
105            let (_, factory) = sources.get(connector_type)?;
106            factory.clone()
107        };
108
109        let mut instance = factory(None);
110        instance.discover_schema(properties).await;
111        let schema = instance.schema();
112        if schema.fields().is_empty() {
113            None
114        } else {
115            Some(schema)
116        }
117    }
118
119    /// Creates a new source connector instance.
120    ///
121    /// The factory creates a default-configured connector. The caller must
122    /// subsequently call `open(config)` to forward WITH clause properties.
123    ///
124    /// If a `prometheus::Registry` is provided, the connector will register
125    /// its metrics on it so they appear in the scrape output.
126    ///
127    /// # Errors
128    ///
129    /// Returns `ConnectorError::ConfigurationError` if not registered.
130    pub fn create_source(
131        &self,
132        config: &ConnectorConfig,
133        registry: Option<&prometheus::Registry>,
134    ) -> Result<Box<dyn SourceConnector>, ConnectorError> {
135        let sources = self.sources.read();
136        let (_, factory) = sources.get(config.connector_type()).ok_or_else(|| {
137            ConnectorError::ConfigurationError(format!(
138                "unknown source connector type: '{}'",
139                config.connector_type()
140            ))
141        })?;
142        Ok(factory(registry))
143    }
144
145    /// Creates a new sink connector instance.
146    ///
147    /// The connector type is determined by `config.connector_type()`.
148    ///
149    /// If a `prometheus::Registry` is provided, the connector will register
150    /// its metrics on it so they appear in the scrape output.
151    ///
152    /// # Errors
153    ///
154    /// Returns `ConnectorError::ConfigurationError` if the connector type
155    /// is not registered.
156    pub fn create_sink(
157        &self,
158        config: &ConnectorConfig,
159        registry: Option<&prometheus::Registry>,
160    ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
161        let sinks = self.sinks.read();
162        let (_, factory) = sinks.get(config.connector_type()).ok_or_else(|| {
163            ConnectorError::ConfigurationError(format!(
164                "unknown sink connector type: '{}'",
165                config.connector_type()
166            ))
167        })?;
168        Ok(factory(registry))
169    }
170
171    /// Registers a reference table source factory.
172    pub fn register_table_source(
173        &self,
174        name: impl Into<String>,
175        info: ConnectorInfo,
176        factory: TableSourceFactory,
177    ) {
178        self.table_sources
179            .write()
180            .insert(name.into(), (info, factory));
181    }
182
183    /// Creates a new reference table source instance.
184    ///
185    /// The connector type is determined by `config.connector_type()`.
186    ///
187    /// # Errors
188    ///
189    /// Returns `ConnectorError::ConfigurationError` if the connector type
190    /// is not registered as a table source.
191    pub fn create_table_source(
192        &self,
193        config: &ConnectorConfig,
194    ) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> {
195        let table_sources = self.table_sources.read();
196        let (_, factory) = table_sources.get(config.connector_type()).ok_or_else(|| {
197            ConnectorError::ConfigurationError(format!(
198                "unknown table source connector type: '{}'",
199                config.connector_type()
200            ))
201        })?;
202        factory(config)
203    }
204
205    /// Lists all registered table source connector names.
206    #[must_use]
207    pub fn list_table_sources(&self) -> Vec<String> {
208        self.table_sources.read().keys().cloned().collect()
209    }
210
211    /// Registers a lookup source factory for on-demand/partial cache mode.
212    pub fn register_lookup_source(
213        &self,
214        name: impl Into<String>,
215        factory: Arc<dyn LookupSourceFactory>,
216    ) {
217        self.lookup_sources.write().insert(name.into(), factory);
218    }
219
220    /// Creates a lookup source for on-demand cache-miss fallback.
221    ///
222    /// Returns `None` if no lookup source factory is registered for
223    /// the given connector type.
224    pub async fn create_lookup_source(
225        &self,
226        config: ConnectorConfig,
227    ) -> Option<Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>>
228    {
229        let factory = {
230            let lookup_sources = self.lookup_sources.read();
231            Arc::clone(lookup_sources.get(config.connector_type())?)
232        };
233        Some(factory.build(config).await)
234    }
235
236    /// Returns information about a registered source connector.
237    #[must_use]
238    pub fn source_info(&self, name: &str) -> Option<ConnectorInfo> {
239        self.sources.read().get(name).map(|(info, _)| info.clone())
240    }
241
242    /// Returns information about a registered sink connector.
243    #[must_use]
244    pub fn sink_info(&self, name: &str) -> Option<ConnectorInfo> {
245        self.sinks.read().get(name).map(|(info, _)| info.clone())
246    }
247
248    /// Lists all registered source connector names.
249    #[must_use]
250    pub fn list_sources(&self) -> Vec<String> {
251        self.sources.read().keys().cloned().collect()
252    }
253
254    /// Lists all registered sink connector names.
255    #[must_use]
256    pub fn list_sinks(&self) -> Vec<String> {
257        self.sinks.read().keys().cloned().collect()
258    }
259
260    /// Creates a deserializer for the given format string.
261    ///
262    /// # Errors
263    ///
264    /// Returns `ConnectorError::Serde` if the format is not supported.
265    pub fn create_deserializer(
266        &self,
267        format: &str,
268    ) -> Result<Box<dyn RecordDeserializer>, ConnectorError> {
269        let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
270        serde::create_deserializer(fmt).map_err(ConnectorError::Serde)
271    }
272
273    /// Creates a serializer for the given format string.
274    ///
275    /// # Errors
276    ///
277    /// Returns `ConnectorError::Serde` if the format is not supported.
278    pub fn create_serializer(
279        &self,
280        format: &str,
281    ) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
282        let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
283        serde::create_serializer(fmt).map_err(ConnectorError::Serde)
284    }
285}
286
287impl Default for ConnectorRegistry {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293impl std::fmt::Debug for ConnectorRegistry {
294    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295        f.debug_struct("ConnectorRegistry")
296            .field("sources", &self.list_sources())
297            .field("sinks", &self.list_sinks())
298            .field("table_sources", &self.list_table_sources())
299            .finish()
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306    use crate::testing::*;
307
308    fn mock_info(name: &str, is_source: bool, is_sink: bool) -> ConnectorInfo {
309        ConnectorInfo {
310            name: name.to_string(),
311            display_name: name.to_string(),
312            version: "0.1.0".to_string(),
313            is_source,
314            is_sink,
315            config_keys: vec![],
316        }
317    }
318
319    #[test]
320    fn test_register_and_create_source() {
321        let registry = ConnectorRegistry::new();
322        registry.register_source(
323            "mock",
324            mock_info("mock", true, false),
325            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
326        );
327
328        let config = ConnectorConfig::new("mock");
329        let connector = registry.create_source(&config, None);
330        assert!(connector.is_ok());
331    }
332
333    #[test]
334    fn test_register_and_create_sink() {
335        let registry = ConnectorRegistry::new();
336        registry.register_sink(
337            "mock",
338            mock_info("mock", false, true),
339            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
340        );
341
342        let config = ConnectorConfig::new("mock");
343        let connector = registry.create_sink(&config, None);
344        assert!(connector.is_ok());
345    }
346
347    #[test]
348    fn test_create_unknown_connector() {
349        let registry = ConnectorRegistry::new();
350        let config = ConnectorConfig::new("nonexistent");
351
352        assert!(registry.create_source(&config, None).is_err());
353        assert!(registry.create_sink(&config, None).is_err());
354    }
355
356    #[test]
357    fn test_list_connectors() {
358        let registry = ConnectorRegistry::new();
359        registry.register_source(
360            "kafka",
361            mock_info("kafka", true, false),
362            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
363        );
364        registry.register_sink(
365            "delta",
366            mock_info("delta", false, true),
367            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
368        );
369
370        let sources = registry.list_sources();
371        assert_eq!(sources.len(), 1);
372        assert!(sources.contains(&"kafka".to_string()));
373
374        let sinks = registry.list_sinks();
375        assert_eq!(sinks.len(), 1);
376        assert!(sinks.contains(&"delta".to_string()));
377    }
378
379    #[test]
380    fn test_connector_info() {
381        let registry = ConnectorRegistry::new();
382        registry.register_source(
383            "kafka",
384            mock_info("kafka", true, false),
385            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
386        );
387
388        let info = registry.source_info("kafka");
389        assert!(info.is_some());
390        assert_eq!(info.unwrap().name, "kafka");
391
392        assert!(registry.source_info("nonexistent").is_none());
393    }
394
395    #[test]
396    fn test_format_registry() {
397        let registry = ConnectorRegistry::new();
398
399        assert!(registry.create_deserializer("json").is_ok());
400        assert!(registry.create_serializer("csv").is_ok());
401        assert!(registry.create_deserializer("unknown").is_err());
402    }
403
404    #[tokio::test]
405    async fn default_source_schema_some_when_discovered() {
406        let registry = ConnectorRegistry::new();
407        registry.register_source(
408            "mock",
409            mock_info("mock", true, false),
410            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
411        );
412        let schema = registry
413            .default_source_schema("mock", &std::collections::HashMap::new())
414            .await;
415        assert!(schema.is_some_and(|s| !s.fields().is_empty()));
416    }
417
418    #[tokio::test]
419    async fn default_source_schema_none_for_unknown_connector() {
420        let registry = ConnectorRegistry::new();
421        assert!(registry
422            .default_source_schema("nope", &std::collections::HashMap::new())
423            .await
424            .is_none());
425    }
426
427    // ── Table source factory tests ──
428
429    #[test]
430    fn test_register_and_create_table_source() {
431        use crate::reference::MockReferenceTableSource;
432
433        let registry = ConnectorRegistry::new();
434        registry.register_table_source(
435            "mock",
436            mock_info("mock", true, false),
437            Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
438        );
439
440        let config = ConnectorConfig::new("mock");
441        let source = registry.create_table_source(&config);
442        assert!(source.is_ok());
443    }
444
445    #[test]
446    fn test_create_unknown_table_source() {
447        let registry = ConnectorRegistry::new();
448        let config = ConnectorConfig::new("nonexistent");
449        let result = registry.create_table_source(&config);
450        match result {
451            Err(e) => assert!(e.to_string().contains("unknown table source"), "got: {e}"),
452            Ok(_) => panic!("Expected error for unknown table source"),
453        }
454    }
455
456    #[test]
457    fn test_list_table_sources() {
458        use crate::reference::MockReferenceTableSource;
459
460        let registry = ConnectorRegistry::new();
461        assert!(registry.list_table_sources().is_empty());
462
463        registry.register_table_source(
464            "mock-table",
465            mock_info("mock-table", true, false),
466            Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
467        );
468
469        let names = registry.list_table_sources();
470        assert_eq!(names.len(), 1);
471        assert!(names.contains(&"mock-table".to_string()));
472    }
473}