Skip to main content

laminar_connectors/
config.rs

1#![allow(clippy::disallowed_types)] // cold path: connector configuration
2
3use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
8use arrow_schema::{Schema, SchemaRef};
9
10use crate::error::ConnectorError;
11
12/// Encode an Arrow schema as a hex-encoded IPC flatbuffer string.
13#[must_use]
14pub fn encode_arrow_schema_ipc(schema: &Schema) -> String {
15    let mut dt = DictionaryTracker::new(false);
16    let enc = IpcDataGenerator::default().schema_to_bytes_with_dictionary_tracker(
17        schema,
18        &mut dt,
19        &IpcWriteOptions::default(),
20    );
21    let bytes = &enc.ipc_message;
22    let mut s = String::with_capacity(bytes.len() * 2);
23    for b in bytes {
24        use std::fmt::Write;
25        let _ = write!(s, "{b:02x}");
26    }
27    s
28}
29
30/// Decode a hex-encoded IPC flatbuffer string to an Arrow schema.
31#[must_use]
32pub fn decode_arrow_schema_ipc(hex: &str) -> Option<Schema> {
33    let bytes: Option<Vec<u8>> = (0..hex.len())
34        .step_by(2)
35        .map(|i| {
36            hex.get(i..i + 2)
37                .and_then(|h| u8::from_str_radix(h, 16).ok())
38        })
39        .collect();
40    arrow_ipc::convert::try_schema_from_flatbuffer_bytes(&bytes?).ok()
41}
42
43/// Configuration for a connector instance.
44///
45/// Connectors receive their configuration as a string key-value map,
46/// typically parsed from SQL `WITH (...)` clauses or programmatic config.
47#[derive(Debug, Clone, Default)]
48pub struct ConnectorConfig {
49    /// The connector type identifier (e.g., "kafka", "postgres-cdc").
50    connector_type: String,
51
52    /// Configuration properties.
53    properties: HashMap<String, String>,
54}
55
56impl ConnectorConfig {
57    /// Creates a new connector config with the given type.
58    #[must_use]
59    pub fn new(connector_type: impl Into<String>) -> Self {
60        Self {
61            connector_type: connector_type.into(),
62            properties: HashMap::new(),
63        }
64    }
65
66    /// Creates a config from existing properties.
67    #[must_use]
68    pub fn with_properties(
69        connector_type: impl Into<String>,
70        properties: HashMap<String, String>,
71    ) -> Self {
72        Self {
73            connector_type: connector_type.into(),
74            properties,
75        }
76    }
77
78    /// Returns the connector type identifier.
79    #[must_use]
80    pub fn connector_type(&self) -> &str {
81        &self.connector_type
82    }
83
84    /// Sets a configuration property.
85    pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
86        self.properties.insert(key.into(), value.into());
87    }
88
89    /// Gets a configuration property.
90    #[must_use]
91    pub fn get(&self, key: &str) -> Option<&str> {
92        self.properties.get(key).map(String::as_str)
93    }
94
95    /// Gets a required configuration property, returning an error if missing.
96    ///
97    /// # Errors
98    ///
99    /// Returns `ConnectorError::MissingConfig` if the key is not set.
100    pub fn require(&self, key: &str) -> Result<&str, ConnectorError> {
101        self.get(key)
102            .ok_or_else(|| ConnectorError::missing_config(key.to_string()))
103    }
104
105    /// Gets a property parsed as the given type.
106    ///
107    /// # Errors
108    ///
109    /// Returns `ConnectorError::ConfigurationError` if the value cannot be parsed.
110    pub fn get_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<Option<T>, ConnectorError>
111    where
112        T::Err: fmt::Display,
113    {
114        match self.get(key) {
115            Some(v) => v.parse::<T>().map(Some).map_err(|e| {
116                ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
117            }),
118            None => Ok(None),
119        }
120    }
121
122    /// Gets a required property parsed as the given type.
123    ///
124    /// # Errors
125    ///
126    /// Returns `ConnectorError::MissingConfig` if the key is missing, or
127    /// `ConnectorError::ConfigurationError` if parsing fails.
128    pub fn require_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<T, ConnectorError>
129    where
130        T::Err: fmt::Display,
131    {
132        let value = self.require(key)?;
133        value.parse::<T>().map_err(|e| {
134            ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
135        })
136    }
137
138    /// Returns all properties as a reference.
139    #[must_use]
140    pub fn properties(&self) -> &HashMap<String, String> {
141        &self.properties
142    }
143
144    /// Returns properties with a given prefix, with the prefix stripped.
145    #[must_use]
146    pub fn properties_with_prefix(&self, prefix: &str) -> HashMap<String, String> {
147        self.properties
148            .iter()
149            .filter_map(|(k, v)| {
150                k.strip_prefix(prefix)
151                    .map(|stripped| (stripped.to_string(), v.clone()))
152            })
153            .collect()
154    }
155
156    /// Decodes the `_arrow_schema` IPC flatbuffer, or `None` if absent.
157    #[must_use]
158    pub fn arrow_schema(&self) -> Option<SchemaRef> {
159        let s = self.get("_arrow_schema")?;
160        decode_arrow_schema_ipc(s).map(Arc::new)
161    }
162
163    /// Formats all properties for display with secrets redacted.
164    ///
165    /// Uses [`SecretMasker`](crate::storage::SecretMasker) to replace
166    /// values of secret keys (passwords, access keys, tokens) with `"***"`.
167    #[cfg(test)]
168    #[must_use]
169    pub fn display_redacted(&self) -> String {
170        use crate::storage::SecretMasker;
171        SecretMasker::display_map(&self.properties)
172    }
173
174    /// Validates the configuration against a set of key specifications.
175    ///
176    /// # Errors
177    ///
178    /// Returns `ConnectorError::MissingConfig` for missing required keys, or
179    /// `ConnectorError::ConfigurationError` for invalid values.
180    pub fn validate(&self, specs: &[ConfigKeySpec]) -> Result<(), ConnectorError> {
181        for spec in specs {
182            if spec.required && self.get(&spec.key).is_none() {
183                if let Some(ref default) = spec.default {
184                    // Has a default, skip
185                    let _ = default;
186                } else {
187                    return Err(ConnectorError::missing_config(spec.key.clone()));
188                }
189            }
190        }
191        Ok(())
192    }
193}
194
195/// Validates that a string field is non-empty.
196///
197/// # Errors
198///
199/// Returns `ConnectorError::ConfigurationError` if the value is empty.
200pub fn require_non_empty(value: &str, field_name: &str) -> Result<(), ConnectorError> {
201    if value.is_empty() {
202        return Err(ConnectorError::ConfigurationError(format!(
203            "{field_name} must not be empty",
204        )));
205    }
206    Ok(())
207}
208
209/// Parses a port string into a `u16`.
210///
211/// # Errors
212///
213/// Returns `ConnectorError::ConfigurationError` if the value is not a valid port number.
214pub fn parse_port(value: &str) -> Result<u16, ConnectorError> {
215    value
216        .parse()
217        .map_err(|_| ConnectorError::ConfigurationError(format!("invalid port: '{value}'")))
218}
219
220/// Specification for a configuration key.
221///
222/// Used by connectors to declare their expected configuration.
223#[derive(Debug, Clone)]
224pub struct ConfigKeySpec {
225    /// The configuration key name.
226    pub key: String,
227
228    /// Human-readable description.
229    pub description: String,
230
231    /// Whether this key is required.
232    pub required: bool,
233
234    /// Default value if not provided.
235    pub default: Option<String>,
236}
237
238impl ConfigKeySpec {
239    /// Creates a required configuration key spec.
240    #[must_use]
241    pub fn required(key: impl Into<String>, description: impl Into<String>) -> Self {
242        Self {
243            key: key.into(),
244            description: description.into(),
245            required: true,
246            default: None,
247        }
248    }
249
250    /// Creates an optional configuration key spec with a default value.
251    #[must_use]
252    pub fn optional(
253        key: impl Into<String>,
254        description: impl Into<String>,
255        default: impl Into<String>,
256    ) -> Self {
257        Self {
258            key: key.into(),
259            description: description.into(),
260            required: false,
261            default: Some(default.into()),
262        }
263    }
264}
265
266/// Metadata about a connector implementation.
267#[derive(Debug, Clone)]
268pub struct ConnectorInfo {
269    /// Unique connector type name (e.g., "kafka", "postgres-cdc").
270    pub name: String,
271
272    /// Human-readable display name.
273    pub display_name: String,
274
275    /// Version string.
276    pub version: String,
277
278    /// Whether this is a source connector.
279    pub is_source: bool,
280
281    /// Whether this is a sink connector.
282    pub is_sink: bool,
283
284    /// Configuration keys this connector accepts.
285    pub config_keys: Vec<ConfigKeySpec>,
286}
287
288/// Lifecycle state of a running connector.
289#[derive(Debug, Clone, Copy, PartialEq, Eq)]
290pub enum ConnectorState {
291    /// Connector has been created but not yet opened.
292    Created,
293
294    /// Connector is initializing (connecting, schema discovery, etc.).
295    Initializing,
296
297    /// Connector is running and processing data.
298    Running,
299
300    /// Connector is paused (e.g., due to backpressure or manual pause).
301    Paused,
302
303    /// Connector encountered an error and is attempting recovery.
304    Recovering,
305
306    /// Connector has been closed.
307    Closed,
308
309    /// Connector has failed and cannot recover.
310    Failed,
311}
312
313impl fmt::Display for ConnectorState {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        match self {
316            ConnectorState::Created => write!(f, "Created"),
317            ConnectorState::Initializing => write!(f, "Initializing"),
318            ConnectorState::Running => write!(f, "Running"),
319            ConnectorState::Paused => write!(f, "Paused"),
320            ConnectorState::Recovering => write!(f, "Recovering"),
321            ConnectorState::Closed => write!(f, "Closed"),
322            ConnectorState::Failed => write!(f, "Failed"),
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use arrow_schema::Schema;
331
332    #[test]
333    fn test_config_basic_operations() {
334        let mut config = ConnectorConfig::new("kafka");
335        config.set("bootstrap.servers", "localhost:9092");
336        config.set("topic", "events");
337
338        assert_eq!(config.connector_type(), "kafka");
339        assert_eq!(config.get("bootstrap.servers"), Some("localhost:9092"));
340        assert_eq!(config.get("topic"), Some("events"));
341        assert_eq!(config.get("missing"), None);
342    }
343
344    #[test]
345    fn test_config_require() {
346        let mut config = ConnectorConfig::new("kafka");
347        config.set("topic", "events");
348
349        assert!(config.require("topic").is_ok());
350        assert!(config.require("missing").is_err());
351    }
352
353    #[test]
354    fn test_config_parsed() {
355        let mut config = ConnectorConfig::new("kafka");
356        config.set("batch.size", "1000");
357        config.set("bad_number", "not_a_number");
358
359        let size: Option<usize> = config.get_parsed("batch.size").unwrap();
360        assert_eq!(size, Some(1000));
361
362        let missing: Option<usize> = config.get_parsed("missing").unwrap();
363        assert_eq!(missing, None);
364
365        let bad: Result<Option<usize>, _> = config.get_parsed("bad_number");
366        assert!(bad.is_err());
367    }
368
369    #[test]
370    fn test_config_require_parsed() {
371        let mut config = ConnectorConfig::new("test");
372        config.set("port", "8080");
373
374        let port: u16 = config.require_parsed("port").unwrap();
375        assert_eq!(port, 8080);
376
377        let missing: Result<u16, _> = config.require_parsed("missing");
378        assert!(missing.is_err());
379    }
380
381    #[test]
382    fn test_config_prefix_extraction() {
383        let mut config = ConnectorConfig::new("kafka");
384        config.set("kafka.bootstrap.servers", "localhost:9092");
385        config.set("kafka.group.id", "my-group");
386        config.set("topic", "events");
387
388        let kafka_props = config.properties_with_prefix("kafka.");
389        assert_eq!(kafka_props.len(), 2);
390        assert_eq!(
391            kafka_props.get("bootstrap.servers"),
392            Some(&"localhost:9092".to_string())
393        );
394        assert_eq!(kafka_props.get("group.id"), Some(&"my-group".to_string()));
395    }
396
397    #[test]
398    fn test_config_validate() {
399        let specs = vec![
400            ConfigKeySpec::required("topic", "Kafka topic"),
401            ConfigKeySpec::optional("batch.size", "Batch size", "100"),
402        ];
403
404        let mut config = ConnectorConfig::new("kafka");
405        config.set("topic", "events");
406
407        assert!(config.validate(&specs).is_ok());
408
409        let empty_config = ConnectorConfig::new("kafka");
410        assert!(empty_config.validate(&specs).is_err());
411    }
412
413    #[test]
414    fn test_config_with_properties() {
415        let mut props = HashMap::new();
416        props.insert("key1".to_string(), "val1".to_string());
417        props.insert("key2".to_string(), "val2".to_string());
418
419        let config = ConnectorConfig::with_properties("test", props);
420        assert_eq!(config.get("key1"), Some("val1"));
421        assert_eq!(config.get("key2"), Some("val2"));
422    }
423
424    #[test]
425    fn test_connector_state_display() {
426        assert_eq!(ConnectorState::Running.to_string(), "Running");
427        assert_eq!(ConnectorState::Failed.to_string(), "Failed");
428    }
429
430    #[test]
431    fn test_display_redacted_masks_secrets() {
432        let mut config = ConnectorConfig::new("delta-lake");
433        config.set("aws_region", "us-east-1");
434        config.set("aws_secret_access_key", "TOP_SECRET");
435        config.set("aws_access_key_id", "AKID123");
436
437        let display = config.display_redacted();
438        assert!(display.contains("aws_region=us-east-1"));
439        assert!(display.contains("aws_secret_access_key=***"));
440        assert!(display.contains("aws_access_key_id=AKID123"));
441        assert!(!display.contains("TOP_SECRET"));
442    }
443
444    #[test]
445    fn test_display_redacted_empty() {
446        let config = ConnectorConfig::new("test");
447        assert!(config.display_redacted().is_empty());
448    }
449
450    #[test]
451    fn test_arrow_schema_ipc_round_trip() {
452        use arrow_schema::{DataType, Field};
453
454        let original = Schema::new(vec![
455            Field::new("s", DataType::Utf8, true),
456            Field::new("p", DataType::Float64, true),
457            Field::new("q", DataType::Float64, true),
458            Field::new("T", DataType::Int64, true),
459        ]);
460        let hex = encode_arrow_schema_ipc(&original);
461
462        let mut config = ConnectorConfig::new("websocket");
463        config.set("_arrow_schema", hex);
464
465        let schema = config.arrow_schema().expect("should parse");
466        assert_eq!(schema.fields().len(), 4);
467        assert_eq!(schema.field(0).name(), "s");
468        assert_eq!(schema.field(0).data_type(), &DataType::Utf8);
469        assert_eq!(schema.field(1).name(), "p");
470        assert_eq!(schema.field(1).data_type(), &DataType::Float64);
471        assert_eq!(schema.field(3).name(), "T");
472        assert_eq!(schema.field(3).data_type(), &DataType::Int64);
473    }
474
475    #[test]
476    fn test_arrow_schema_ipc_handles_map_type() {
477        use arrow_schema::{DataType, Field, Fields};
478
479        let map_field = Field::new(
480            "entries",
481            DataType::Struct(Fields::from(vec![
482                Field::new("key", DataType::Utf8, false),
483                Field::new("value", DataType::Float64, true),
484            ])),
485            false,
486        );
487        let original = Schema::new(vec![
488            Field::new("sensor_id", DataType::Utf8, true),
489            Field::new("data", DataType::Map(Arc::new(map_field), false), true),
490        ]);
491        let hex = encode_arrow_schema_ipc(&original);
492
493        let mut config = ConnectorConfig::new("kafka");
494        config.set("_arrow_schema", hex);
495
496        let schema = config.arrow_schema().expect("should parse Map type");
497        assert_eq!(schema.fields().len(), 2);
498        assert_eq!(schema.field(0).name(), "sensor_id");
499        assert!(matches!(schema.field(1).data_type(), DataType::Map(_, _)));
500    }
501
502    #[test]
503    fn test_arrow_schema_returns_none_when_absent() {
504        let config = ConnectorConfig::new("kafka");
505        assert!(config.arrow_schema().is_none());
506    }
507
508    #[test]
509    fn test_arrow_schema_returns_none_for_empty_value() {
510        let mut config = ConnectorConfig::new("kafka");
511        config.set("_arrow_schema", "");
512        assert!(config.arrow_schema().is_none());
513    }
514}