Skip to main content

laminar_connectors/
registry.rs

1//! Registry of connector factories, keyed by connector type string.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use arrow_schema::SchemaRef;
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::config::{ConnectorConfig, ConnectorInfo};
11use crate::connector::{SinkConnector, SourceConnector};
12use crate::error::ConnectorError;
13use crate::reference::ReferenceTableSource;
14use crate::serde::{self, Format, RecordDeserializer, RecordSerializer};
15
16/// Factory function type for creating source connectors.
17///
18/// The optional `&prometheus::Registry` allows connectors to register
19/// their metrics on the shared Prometheus registry when one is available.
20pub type SourceFactory =
21    Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SourceConnector> + Send + Sync>;
22
23/// Factory function type for creating sink connectors.
24///
25/// The optional `&prometheus::Registry` allows connectors to register
26/// their metrics on the shared Prometheus registry when one is available.
27pub type SinkFactory =
28    Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SinkConnector> + Send + Sync>;
29
30/// Factory function type for creating reference table sources.
31pub type TableSourceFactory = Arc<
32    dyn Fn(&ConnectorConfig) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> + Send + Sync,
33>;
34
35/// Factory for constructing a lookup source (async, for on-demand mode).
36///
37/// Previously this was a hand-rolled `Arc<dyn Fn(...) -> Pin<Box<Future>>>`
38/// type alias that nobody could read. A trait with an `async` method
39/// says the same thing without forcing the caller to spell out the
40/// `Pin<Box<...>>`.
41#[async_trait]
42pub trait LookupSourceFactory: Send + Sync {
43    /// Build a lookup source instance from the given config.
44    ///
45    /// `declared_schema` is the table's declared Arrow schema (from the
46    /// `CREATE LOOKUP TABLE` columns), when known. Schema-bearing sources
47    /// (Delta/Iceberg/Postgres) derive their own and ignore it; schemaless
48    /// sources (`MongoDB`) need it to project documents into typed columns.
49    async fn build(
50        &self,
51        config: ConnectorConfig,
52        declared_schema: Option<arrow_schema::SchemaRef>,
53    ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>;
54}
55
56/// Registry of available connector implementations. Connectors register
57/// a factory per type string; the runtime looks up by the `connector`
58/// property in `CREATE SOURCE/SINK` DDL.
59#[derive(Clone)]
60pub struct ConnectorRegistry {
61    sources: Arc<RwLock<HashMap<String, (ConnectorInfo, SourceFactory)>>>,
62    sinks: Arc<RwLock<HashMap<String, (ConnectorInfo, SinkFactory)>>>,
63    table_sources: Arc<RwLock<HashMap<String, (ConnectorInfo, TableSourceFactory)>>>,
64    lookup_sources: Arc<RwLock<HashMap<String, Arc<dyn LookupSourceFactory>>>>,
65}
66
67impl ConnectorRegistry {
68    /// Creates a new empty registry.
69    #[must_use]
70    pub fn new() -> Self {
71        Self {
72            sources: Arc::new(RwLock::new(HashMap::new())),
73            sinks: Arc::new(RwLock::new(HashMap::new())),
74            table_sources: Arc::new(RwLock::new(HashMap::new())),
75            lookup_sources: Arc::new(RwLock::new(HashMap::new())),
76        }
77    }
78
79    /// Registers a source connector factory.
80    pub fn register_source(
81        &self,
82        name: impl Into<String>,
83        info: ConnectorInfo,
84        factory: SourceFactory,
85    ) {
86        self.sources.write().insert(name.into(), (info, factory));
87    }
88
89    /// Registers a sink connector factory.
90    pub fn register_sink(
91        &self,
92        name: impl Into<String>,
93        info: ConnectorInfo,
94        factory: SinkFactory,
95    ) {
96        self.sinks.write().insert(name.into(), (info, factory));
97    }
98
99    /// Run a connector's `discover_schema` against the given properties.
100    ///
101    /// Three outcomes:
102    /// - `Ok(Some(schema))` — discovery succeeded and produced fields.
103    /// - `Ok(None)` — connector type is unknown OR the connector chose
104    ///   not to discover (e.g. non-Avro Kafka format, missing SR url).
105    /// - `Err(_)` — discovery was attempted and failed with a specific
106    ///   cause; callers should surface the message verbatim.
107    ///
108    /// # Errors
109    ///
110    /// Returns the underlying [`ConnectorError`] from
111    /// [`SourceConnector::discover_schema`] when discovery fails (bad
112    /// config, unreachable network endpoint, timeout, etc.).
113    pub async fn default_source_schema(
114        &self,
115        connector_type: &str,
116        properties: &std::collections::HashMap<String, String>,
117    ) -> Result<Option<SchemaRef>, ConnectorError> {
118        let factory = {
119            let sources = self.sources.read();
120            let Some((_, factory)) = sources.get(connector_type) else {
121                return Ok(None);
122            };
123            factory.clone()
124        };
125
126        let mut instance = factory(None);
127        instance.discover_schema(properties).await?;
128        let schema = instance.schema();
129        Ok((!schema.fields().is_empty()).then_some(schema))
130    }
131
132    /// Creates a new source connector instance.
133    ///
134    /// The factory creates a default-configured connector. The caller must
135    /// subsequently call `open(config)` to forward WITH clause properties.
136    ///
137    /// If a `prometheus::Registry` is provided, the connector will register
138    /// its metrics on it so they appear in the scrape output.
139    ///
140    /// # Errors
141    ///
142    /// Returns `ConnectorError::ConfigurationError` if not registered.
143    pub fn create_source(
144        &self,
145        config: &ConnectorConfig,
146        registry: Option<&prometheus::Registry>,
147    ) -> Result<Box<dyn SourceConnector>, ConnectorError> {
148        let sources = self.sources.read();
149        let (_, factory) = sources.get(config.connector_type()).ok_or_else(|| {
150            ConnectorError::ConfigurationError(format!(
151                "unknown source connector type: '{}'",
152                config.connector_type()
153            ))
154        })?;
155        Ok(factory(registry))
156    }
157
158    /// Creates a new sink connector instance.
159    ///
160    /// The connector type is determined by `config.connector_type()`.
161    ///
162    /// If a `prometheus::Registry` is provided, the connector will register
163    /// its metrics on it so they appear in the scrape output.
164    ///
165    /// # Errors
166    ///
167    /// Returns `ConnectorError::ConfigurationError` if the connector type
168    /// is not registered.
169    pub fn create_sink(
170        &self,
171        config: &ConnectorConfig,
172        registry: Option<&prometheus::Registry>,
173    ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
174        let sinks = self.sinks.read();
175        let (_, factory) = sinks.get(config.connector_type()).ok_or_else(|| {
176            ConnectorError::ConfigurationError(format!(
177                "unknown sink connector type: '{}'",
178                config.connector_type()
179            ))
180        })?;
181        Ok(factory(registry))
182    }
183
184    /// Registers a reference table source factory.
185    pub fn register_table_source(
186        &self,
187        name: impl Into<String>,
188        info: ConnectorInfo,
189        factory: TableSourceFactory,
190    ) {
191        self.table_sources
192            .write()
193            .insert(name.into(), (info, factory));
194    }
195
196    /// Creates a new reference table source instance.
197    ///
198    /// The connector type is determined by `config.connector_type()`.
199    ///
200    /// # Errors
201    ///
202    /// Returns `ConnectorError::ConfigurationError` if the connector type
203    /// is not registered as a table source.
204    pub fn create_table_source(
205        &self,
206        config: &ConnectorConfig,
207    ) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> {
208        let table_sources = self.table_sources.read();
209        let (_, factory) = table_sources.get(config.connector_type()).ok_or_else(|| {
210            ConnectorError::ConfigurationError(format!(
211                "unknown table source connector type: '{}'",
212                config.connector_type()
213            ))
214        })?;
215        factory(config)
216    }
217
218    /// Lists all registered table source connector names.
219    #[must_use]
220    pub fn list_table_sources(&self) -> Vec<String> {
221        self.table_sources.read().keys().cloned().collect()
222    }
223
224    /// Registers a lookup source factory for on-demand/partial cache mode.
225    pub fn register_lookup_source(
226        &self,
227        name: impl Into<String>,
228        factory: Arc<dyn LookupSourceFactory>,
229    ) {
230        self.lookup_sources.write().insert(name.into(), factory);
231    }
232
233    /// Creates a lookup source for on-demand cache-miss fallback.
234    ///
235    /// Returns `None` if no lookup source factory is registered for
236    /// the given connector type.
237    pub async fn create_lookup_source(
238        &self,
239        config: ConnectorConfig,
240        declared_schema: Option<arrow_schema::SchemaRef>,
241    ) -> Option<Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>>
242    {
243        let factory = {
244            let lookup_sources = self.lookup_sources.read();
245            Arc::clone(lookup_sources.get(config.connector_type())?)
246        };
247        Some(factory.build(config, declared_schema).await)
248    }
249
250    /// Returns information about a registered source connector.
251    #[must_use]
252    pub fn source_info(&self, name: &str) -> Option<ConnectorInfo> {
253        self.sources.read().get(name).map(|(info, _)| info.clone())
254    }
255
256    /// Returns information about a registered sink connector.
257    #[must_use]
258    pub fn sink_info(&self, name: &str) -> Option<ConnectorInfo> {
259        self.sinks.read().get(name).map(|(info, _)| info.clone())
260    }
261
262    /// Lists all registered source connector names.
263    #[must_use]
264    pub fn list_sources(&self) -> Vec<String> {
265        self.sources.read().keys().cloned().collect()
266    }
267
268    /// Lists all registered sink connector names.
269    #[must_use]
270    pub fn list_sinks(&self) -> Vec<String> {
271        self.sinks.read().keys().cloned().collect()
272    }
273
274    /// Creates a deserializer for the given format string.
275    ///
276    /// # Errors
277    ///
278    /// Returns `ConnectorError::Serde` if the format is not supported.
279    pub fn create_deserializer(
280        &self,
281        format: &str,
282    ) -> Result<Box<dyn RecordDeserializer>, ConnectorError> {
283        let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
284        serde::create_deserializer(fmt).map_err(ConnectorError::Serde)
285    }
286
287    /// Creates a serializer for the given format string.
288    ///
289    /// # Errors
290    ///
291    /// Returns `ConnectorError::Serde` if the format is not supported.
292    pub fn create_serializer(
293        &self,
294        format: &str,
295    ) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
296        let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
297        serde::create_serializer(fmt).map_err(ConnectorError::Serde)
298    }
299}
300
301impl Default for ConnectorRegistry {
302    fn default() -> Self {
303        Self::new()
304    }
305}
306
307impl std::fmt::Debug for ConnectorRegistry {
308    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309        f.debug_struct("ConnectorRegistry")
310            .field("sources", &self.list_sources())
311            .field("sinks", &self.list_sinks())
312            .field("table_sources", &self.list_table_sources())
313            .finish()
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320    use crate::testing::*;
321
322    fn mock_info(name: &str, is_source: bool, is_sink: bool) -> ConnectorInfo {
323        ConnectorInfo {
324            name: name.to_string(),
325            display_name: name.to_string(),
326            version: "0.1.0".to_string(),
327            is_source,
328            is_sink,
329            config_keys: vec![],
330        }
331    }
332
333    #[test]
334    fn test_register_and_create_source() {
335        let registry = ConnectorRegistry::new();
336        registry.register_source(
337            "mock",
338            mock_info("mock", true, false),
339            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
340        );
341
342        let config = ConnectorConfig::new("mock");
343        let connector = registry.create_source(&config, None);
344        assert!(connector.is_ok());
345    }
346
347    #[test]
348    fn test_register_and_create_sink() {
349        let registry = ConnectorRegistry::new();
350        registry.register_sink(
351            "mock",
352            mock_info("mock", false, true),
353            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
354        );
355
356        let config = ConnectorConfig::new("mock");
357        let connector = registry.create_sink(&config, None);
358        assert!(connector.is_ok());
359    }
360
361    #[test]
362    fn test_create_unknown_connector() {
363        let registry = ConnectorRegistry::new();
364        let config = ConnectorConfig::new("nonexistent");
365
366        assert!(registry.create_source(&config, None).is_err());
367        assert!(registry.create_sink(&config, None).is_err());
368    }
369
370    #[test]
371    fn test_list_connectors() {
372        let registry = ConnectorRegistry::new();
373        registry.register_source(
374            "kafka",
375            mock_info("kafka", true, false),
376            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
377        );
378        registry.register_sink(
379            "delta",
380            mock_info("delta", false, true),
381            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
382        );
383
384        let sources = registry.list_sources();
385        assert_eq!(sources.len(), 1);
386        assert!(sources.contains(&"kafka".to_string()));
387
388        let sinks = registry.list_sinks();
389        assert_eq!(sinks.len(), 1);
390        assert!(sinks.contains(&"delta".to_string()));
391    }
392
393    #[test]
394    fn test_connector_info() {
395        let registry = ConnectorRegistry::new();
396        registry.register_source(
397            "kafka",
398            mock_info("kafka", true, false),
399            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
400        );
401
402        let info = registry.source_info("kafka");
403        assert!(info.is_some());
404        assert_eq!(info.unwrap().name, "kafka");
405
406        assert!(registry.source_info("nonexistent").is_none());
407    }
408
409    #[test]
410    fn test_format_registry() {
411        let registry = ConnectorRegistry::new();
412
413        assert!(registry.create_deserializer("json").is_ok());
414        assert!(registry.create_serializer("csv").is_ok());
415        assert!(registry.create_deserializer("unknown").is_err());
416    }
417
418    #[tokio::test]
419    async fn default_source_schema_some_when_discovered() {
420        let registry = ConnectorRegistry::new();
421        registry.register_source(
422            "mock",
423            mock_info("mock", true, false),
424            Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
425        );
426        let schema = registry
427            .default_source_schema("mock", &std::collections::HashMap::new())
428            .await
429            .expect("discovery must not fail");
430        assert!(schema.is_some_and(|s| !s.fields().is_empty()));
431    }
432
433    #[tokio::test]
434    async fn default_source_schema_none_for_unknown_connector() {
435        let registry = ConnectorRegistry::new();
436        assert!(registry
437            .default_source_schema("nope", &std::collections::HashMap::new())
438            .await
439            .expect("unknown connector is Ok(None), not Err")
440            .is_none());
441    }
442
443    // ── Table source factory tests ──
444
445    #[test]
446    fn test_register_and_create_table_source() {
447        use crate::reference::MockReferenceTableSource;
448
449        let registry = ConnectorRegistry::new();
450        registry.register_table_source(
451            "mock",
452            mock_info("mock", true, false),
453            Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
454        );
455
456        let config = ConnectorConfig::new("mock");
457        let source = registry.create_table_source(&config);
458        assert!(source.is_ok());
459    }
460
461    #[test]
462    fn test_create_unknown_table_source() {
463        let registry = ConnectorRegistry::new();
464        let config = ConnectorConfig::new("nonexistent");
465        let result = registry.create_table_source(&config);
466        match result {
467            Err(e) => assert!(e.to_string().contains("unknown table source"), "got: {e}"),
468            Ok(_) => panic!("Expected error for unknown table source"),
469        }
470    }
471
472    #[test]
473    fn test_list_table_sources() {
474        use crate::reference::MockReferenceTableSource;
475
476        let registry = ConnectorRegistry::new();
477        assert!(registry.list_table_sources().is_empty());
478
479        registry.register_table_source(
480            "mock-table",
481            mock_info("mock-table", true, false),
482            Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
483        );
484
485        let names = registry.list_table_sources();
486        assert_eq!(names.len(), 1);
487        assert!(names.contains(&"mock-table".to_string()));
488    }
489}