Skip to main content

laminar_connectors/
config.rs

1//! Connector configuration types.
2//!
3//! Provides a generic configuration model for connectors:
4//! - `ConnectorConfig`: Key-value configuration with validation
5//! - `ConfigKeySpec`: Specification for a configuration key
6//! - `ConnectorInfo`: Metadata about a connector implementation
7//! - `ConnectorState`: Lifecycle state of a running connector
8#![allow(clippy::disallowed_types)] // cold path: connector configuration
9
10use std::collections::HashMap;
11use std::fmt;
12use std::sync::Arc;
13
14use arrow_schema::{DataType, Field, Schema, SchemaRef};
15
16use crate::error::ConnectorError;
17
18/// Configuration for a connector instance.
19///
20/// Connectors receive their configuration as a string key-value map,
21/// typically parsed from SQL `WITH (...)` clauses or programmatic config.
22#[derive(Debug, Clone, Default)]
23pub struct ConnectorConfig {
24    /// The connector type identifier (e.g., "kafka", "postgres-cdc").
25    connector_type: String,
26
27    /// Configuration properties.
28    properties: HashMap<String, String>,
29}
30
31impl ConnectorConfig {
32    /// Creates a new connector config with the given type.
33    #[must_use]
34    pub fn new(connector_type: impl Into<String>) -> Self {
35        Self {
36            connector_type: connector_type.into(),
37            properties: HashMap::new(),
38        }
39    }
40
41    /// Creates a config from existing properties.
42    #[must_use]
43    pub fn with_properties(
44        connector_type: impl Into<String>,
45        properties: HashMap<String, String>,
46    ) -> Self {
47        Self {
48            connector_type: connector_type.into(),
49            properties,
50        }
51    }
52
53    /// Returns the connector type identifier.
54    #[must_use]
55    pub fn connector_type(&self) -> &str {
56        &self.connector_type
57    }
58
59    /// Sets a configuration property.
60    pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
61        self.properties.insert(key.into(), value.into());
62    }
63
64    /// Gets a configuration property.
65    #[must_use]
66    pub fn get(&self, key: &str) -> Option<&str> {
67        self.properties.get(key).map(String::as_str)
68    }
69
70    /// Gets a required configuration property, returning an error if missing.
71    ///
72    /// # Errors
73    ///
74    /// Returns `ConnectorError::MissingConfig` if the key is not set.
75    pub fn require(&self, key: &str) -> Result<&str, ConnectorError> {
76        self.get(key)
77            .ok_or_else(|| ConnectorError::MissingConfig(key.to_string()))
78    }
79
80    /// Gets a property parsed as the given type.
81    ///
82    /// # Errors
83    ///
84    /// Returns `ConnectorError::ConfigurationError` if the value cannot be parsed.
85    pub fn get_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<Option<T>, ConnectorError>
86    where
87        T::Err: fmt::Display,
88    {
89        match self.get(key) {
90            Some(v) => v.parse::<T>().map(Some).map_err(|e| {
91                ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
92            }),
93            None => Ok(None),
94        }
95    }
96
97    /// Gets a required property parsed as the given type.
98    ///
99    /// # Errors
100    ///
101    /// Returns `ConnectorError::MissingConfig` if the key is missing, or
102    /// `ConnectorError::ConfigurationError` if parsing fails.
103    pub fn require_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<T, ConnectorError>
104    where
105        T::Err: fmt::Display,
106    {
107        let value = self.require(key)?;
108        value.parse::<T>().map_err(|e| {
109            ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
110        })
111    }
112
113    /// Returns all properties as a reference.
114    #[must_use]
115    pub fn properties(&self) -> &HashMap<String, String> {
116        &self.properties
117    }
118
119    /// Returns properties with a given prefix, with the prefix stripped.
120    #[must_use]
121    pub fn properties_with_prefix(&self, prefix: &str) -> HashMap<String, String> {
122        self.properties
123            .iter()
124            .filter_map(|(k, v)| {
125                k.strip_prefix(prefix)
126                    .map(|stripped| (stripped.to_string(), v.clone()))
127            })
128            .collect()
129    }
130
131    /// Returns the SQL-defined Arrow schema passed via `_arrow_schema`,
132    /// or `None` if absent or unparseable.
133    ///
134    /// The compact format is `"col1:Type1,col2:Type2,..."` as produced by
135    /// `encode_arrow_schema` in `laminar-db`.
136    ///
137    /// **Note:** This method re-parses the schema string on each call.
138    /// Cache the result if you need it in a hot loop.
139    #[must_use]
140    pub fn arrow_schema(&self) -> Option<SchemaRef> {
141        let s = self.get("_arrow_schema")?;
142        let fields: Vec<Field> = s
143            .split(',')
144            .filter(|part| !part.is_empty())
145            .filter_map(|part| {
146                let (name, type_str) = part.split_once(':')?;
147                let dt = match type_str {
148                    "Utf8" => DataType::Utf8,
149                    "LargeUtf8" => DataType::LargeUtf8,
150                    "Float64" => DataType::Float64,
151                    "Float32" => DataType::Float32,
152                    "Int64" => DataType::Int64,
153                    "Int32" => DataType::Int32,
154                    "Int16" => DataType::Int16,
155                    "Int8" => DataType::Int8,
156                    "UInt64" => DataType::UInt64,
157                    "UInt32" => DataType::UInt32,
158                    "Boolean" => DataType::Boolean,
159                    "Date32" => DataType::Date32,
160                    "Date64" => DataType::Date64,
161                    "Timestamp(Millisecond)" => {
162                        DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
163                    }
164                    "Timestamp(Microsecond)" => {
165                        DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
166                    }
167                    "Timestamp(Nanosecond)" => {
168                        DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
169                    }
170                    "Timestamp(Second)" => {
171                        DataType::Timestamp(arrow_schema::TimeUnit::Second, None)
172                    }
173                    other => {
174                        tracing::warn!(
175                            type_str = other,
176                            field = name,
177                            "unknown Arrow type in _arrow_schema — column skipped"
178                        );
179                        return None;
180                    }
181                };
182                Some(Field::new(name, dt, true))
183            })
184            .collect();
185
186        if fields.is_empty() {
187            None
188        } else {
189            Some(Arc::new(Schema::new(fields)))
190        }
191    }
192
193    /// Formats all properties for display with secrets redacted.
194    ///
195    /// Uses [`SecretMasker`](crate::storage::SecretMasker) to replace
196    /// values of secret keys (passwords, access keys, tokens) with `"***"`.
197    #[cfg(test)]
198    #[must_use]
199    pub fn display_redacted(&self) -> String {
200        use crate::storage::SecretMasker;
201        SecretMasker::display_map(&self.properties)
202    }
203
204    /// Validates the configuration against a set of key specifications.
205    ///
206    /// # Errors
207    ///
208    /// Returns `ConnectorError::MissingConfig` for missing required keys, or
209    /// `ConnectorError::ConfigurationError` for invalid values.
210    pub fn validate(&self, specs: &[ConfigKeySpec]) -> Result<(), ConnectorError> {
211        for spec in specs {
212            if spec.required && self.get(&spec.key).is_none() {
213                if let Some(ref default) = spec.default {
214                    // Has a default, skip
215                    let _ = default;
216                } else {
217                    return Err(ConnectorError::MissingConfig(spec.key.clone()));
218                }
219            }
220        }
221        Ok(())
222    }
223}
224
225/// Validates that a string field is non-empty.
226///
227/// # Errors
228///
229/// Returns `ConnectorError::ConfigurationError` if the value is empty.
230pub fn require_non_empty(value: &str, field_name: &str) -> Result<(), ConnectorError> {
231    if value.is_empty() {
232        return Err(ConnectorError::ConfigurationError(format!(
233            "{field_name} must not be empty",
234        )));
235    }
236    Ok(())
237}
238
239/// Parses a port string into a `u16`.
240///
241/// # Errors
242///
243/// Returns `ConnectorError::ConfigurationError` if the value is not a valid port number.
244pub fn parse_port(value: &str) -> Result<u16, ConnectorError> {
245    value
246        .parse()
247        .map_err(|_| ConnectorError::ConfigurationError(format!("invalid port: '{value}'")))
248}
249
250/// Specification for a configuration key.
251///
252/// Used by connectors to declare their expected configuration.
253#[derive(Debug, Clone)]
254pub struct ConfigKeySpec {
255    /// The configuration key name.
256    pub key: String,
257
258    /// Human-readable description.
259    pub description: String,
260
261    /// Whether this key is required.
262    pub required: bool,
263
264    /// Default value if not provided.
265    pub default: Option<String>,
266}
267
268impl ConfigKeySpec {
269    /// Creates a required configuration key spec.
270    #[must_use]
271    pub fn required(key: impl Into<String>, description: impl Into<String>) -> Self {
272        Self {
273            key: key.into(),
274            description: description.into(),
275            required: true,
276            default: None,
277        }
278    }
279
280    /// Creates an optional configuration key spec with a default value.
281    #[must_use]
282    pub fn optional(
283        key: impl Into<String>,
284        description: impl Into<String>,
285        default: impl Into<String>,
286    ) -> Self {
287        Self {
288            key: key.into(),
289            description: description.into(),
290            required: false,
291            default: Some(default.into()),
292        }
293    }
294}
295
296/// Metadata about a connector implementation.
297#[derive(Debug, Clone)]
298pub struct ConnectorInfo {
299    /// Unique connector type name (e.g., "kafka", "postgres-cdc").
300    pub name: String,
301
302    /// Human-readable display name.
303    pub display_name: String,
304
305    /// Version string.
306    pub version: String,
307
308    /// Whether this is a source connector.
309    pub is_source: bool,
310
311    /// Whether this is a sink connector.
312    pub is_sink: bool,
313
314    /// Configuration keys this connector accepts.
315    pub config_keys: Vec<ConfigKeySpec>,
316}
317
318/// Lifecycle state of a running connector.
319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
320pub enum ConnectorState {
321    /// Connector has been created but not yet opened.
322    Created,
323
324    /// Connector is initializing (connecting, schema discovery, etc.).
325    Initializing,
326
327    /// Connector is running and processing data.
328    Running,
329
330    /// Connector is paused (e.g., due to backpressure or manual pause).
331    Paused,
332
333    /// Connector encountered an error and is attempting recovery.
334    Recovering,
335
336    /// Connector has been closed.
337    Closed,
338
339    /// Connector has failed and cannot recover.
340    Failed,
341}
342
343impl fmt::Display for ConnectorState {
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345        match self {
346            ConnectorState::Created => write!(f, "Created"),
347            ConnectorState::Initializing => write!(f, "Initializing"),
348            ConnectorState::Running => write!(f, "Running"),
349            ConnectorState::Paused => write!(f, "Paused"),
350            ConnectorState::Recovering => write!(f, "Recovering"),
351            ConnectorState::Closed => write!(f, "Closed"),
352            ConnectorState::Failed => write!(f, "Failed"),
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[test]
362    fn test_config_basic_operations() {
363        let mut config = ConnectorConfig::new("kafka");
364        config.set("bootstrap.servers", "localhost:9092");
365        config.set("topic", "events");
366
367        assert_eq!(config.connector_type(), "kafka");
368        assert_eq!(config.get("bootstrap.servers"), Some("localhost:9092"));
369        assert_eq!(config.get("topic"), Some("events"));
370        assert_eq!(config.get("missing"), None);
371    }
372
373    #[test]
374    fn test_config_require() {
375        let mut config = ConnectorConfig::new("kafka");
376        config.set("topic", "events");
377
378        assert!(config.require("topic").is_ok());
379        assert!(config.require("missing").is_err());
380    }
381
382    #[test]
383    fn test_config_parsed() {
384        let mut config = ConnectorConfig::new("kafka");
385        config.set("batch.size", "1000");
386        config.set("bad_number", "not_a_number");
387
388        let size: Option<usize> = config.get_parsed("batch.size").unwrap();
389        assert_eq!(size, Some(1000));
390
391        let missing: Option<usize> = config.get_parsed("missing").unwrap();
392        assert_eq!(missing, None);
393
394        let bad: Result<Option<usize>, _> = config.get_parsed("bad_number");
395        assert!(bad.is_err());
396    }
397
398    #[test]
399    fn test_config_require_parsed() {
400        let mut config = ConnectorConfig::new("test");
401        config.set("port", "8080");
402
403        let port: u16 = config.require_parsed("port").unwrap();
404        assert_eq!(port, 8080);
405
406        let missing: Result<u16, _> = config.require_parsed("missing");
407        assert!(missing.is_err());
408    }
409
410    #[test]
411    fn test_config_prefix_extraction() {
412        let mut config = ConnectorConfig::new("kafka");
413        config.set("kafka.bootstrap.servers", "localhost:9092");
414        config.set("kafka.group.id", "my-group");
415        config.set("topic", "events");
416
417        let kafka_props = config.properties_with_prefix("kafka.");
418        assert_eq!(kafka_props.len(), 2);
419        assert_eq!(
420            kafka_props.get("bootstrap.servers"),
421            Some(&"localhost:9092".to_string())
422        );
423        assert_eq!(kafka_props.get("group.id"), Some(&"my-group".to_string()));
424    }
425
426    #[test]
427    fn test_config_validate() {
428        let specs = vec![
429            ConfigKeySpec::required("topic", "Kafka topic"),
430            ConfigKeySpec::optional("batch.size", "Batch size", "100"),
431        ];
432
433        let mut config = ConnectorConfig::new("kafka");
434        config.set("topic", "events");
435
436        assert!(config.validate(&specs).is_ok());
437
438        let empty_config = ConnectorConfig::new("kafka");
439        assert!(empty_config.validate(&specs).is_err());
440    }
441
442    #[test]
443    fn test_config_with_properties() {
444        let mut props = HashMap::new();
445        props.insert("key1".to_string(), "val1".to_string());
446        props.insert("key2".to_string(), "val2".to_string());
447
448        let config = ConnectorConfig::with_properties("test", props);
449        assert_eq!(config.get("key1"), Some("val1"));
450        assert_eq!(config.get("key2"), Some("val2"));
451    }
452
453    #[test]
454    fn test_connector_state_display() {
455        assert_eq!(ConnectorState::Running.to_string(), "Running");
456        assert_eq!(ConnectorState::Failed.to_string(), "Failed");
457    }
458
459    #[test]
460    fn test_display_redacted_masks_secrets() {
461        let mut config = ConnectorConfig::new("delta-lake");
462        config.set("aws_region", "us-east-1");
463        config.set("aws_secret_access_key", "TOP_SECRET");
464        config.set("aws_access_key_id", "AKID123");
465
466        let display = config.display_redacted();
467        assert!(display.contains("aws_region=us-east-1"));
468        assert!(display.contains("aws_secret_access_key=***"));
469        assert!(display.contains("aws_access_key_id=AKID123"));
470        assert!(!display.contains("TOP_SECRET"));
471    }
472
473    #[test]
474    fn test_display_redacted_empty() {
475        let config = ConnectorConfig::new("test");
476        assert!(config.display_redacted().is_empty());
477    }
478
479    #[test]
480    fn test_arrow_schema_parses_compact_encoding() {
481        let mut config = ConnectorConfig::new("websocket");
482        config.set("_arrow_schema", "s:Utf8,p:Float64,q:Float64,T:Int64");
483
484        let schema = config.arrow_schema().expect("should parse");
485        assert_eq!(schema.fields().len(), 4);
486        assert_eq!(schema.field(0).name(), "s");
487        assert_eq!(schema.field(0).data_type(), &DataType::Utf8);
488        assert_eq!(schema.field(1).name(), "p");
489        assert_eq!(schema.field(1).data_type(), &DataType::Float64);
490        assert_eq!(schema.field(3).name(), "T");
491        assert_eq!(schema.field(3).data_type(), &DataType::Int64);
492        assert!(schema.field(0).is_nullable());
493    }
494
495    #[test]
496    fn test_arrow_schema_returns_none_when_absent() {
497        let config = ConnectorConfig::new("kafka");
498        assert!(config.arrow_schema().is_none());
499    }
500
501    #[test]
502    fn test_arrow_schema_returns_none_for_empty_value() {
503        let mut config = ConnectorConfig::new("kafka");
504        config.set("_arrow_schema", "");
505        assert!(config.arrow_schema().is_none());
506    }
507}