1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
11use std::fmt;
12use std::sync::Arc;
13
14use arrow_schema::{DataType, Field, Schema, SchemaRef};
15
16use crate::error::ConnectorError;
17
18#[derive(Debug, Clone, Default)]
23pub struct ConnectorConfig {
24 connector_type: String,
26
27 properties: HashMap<String, String>,
29}
30
31impl ConnectorConfig {
32 #[must_use]
34 pub fn new(connector_type: impl Into<String>) -> Self {
35 Self {
36 connector_type: connector_type.into(),
37 properties: HashMap::new(),
38 }
39 }
40
41 #[must_use]
43 pub fn with_properties(
44 connector_type: impl Into<String>,
45 properties: HashMap<String, String>,
46 ) -> Self {
47 Self {
48 connector_type: connector_type.into(),
49 properties,
50 }
51 }
52
53 #[must_use]
55 pub fn connector_type(&self) -> &str {
56 &self.connector_type
57 }
58
59 pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
61 self.properties.insert(key.into(), value.into());
62 }
63
64 #[must_use]
66 pub fn get(&self, key: &str) -> Option<&str> {
67 self.properties.get(key).map(String::as_str)
68 }
69
70 pub fn require(&self, key: &str) -> Result<&str, ConnectorError> {
76 self.get(key)
77 .ok_or_else(|| ConnectorError::MissingConfig(key.to_string()))
78 }
79
80 pub fn get_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<Option<T>, ConnectorError>
86 where
87 T::Err: fmt::Display,
88 {
89 match self.get(key) {
90 Some(v) => v.parse::<T>().map(Some).map_err(|e| {
91 ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
92 }),
93 None => Ok(None),
94 }
95 }
96
97 pub fn require_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<T, ConnectorError>
104 where
105 T::Err: fmt::Display,
106 {
107 let value = self.require(key)?;
108 value.parse::<T>().map_err(|e| {
109 ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
110 })
111 }
112
113 #[must_use]
115 pub fn properties(&self) -> &HashMap<String, String> {
116 &self.properties
117 }
118
119 #[must_use]
121 pub fn properties_with_prefix(&self, prefix: &str) -> HashMap<String, String> {
122 self.properties
123 .iter()
124 .filter_map(|(k, v)| {
125 k.strip_prefix(prefix)
126 .map(|stripped| (stripped.to_string(), v.clone()))
127 })
128 .collect()
129 }
130
131 #[must_use]
140 pub fn arrow_schema(&self) -> Option<SchemaRef> {
141 let s = self.get("_arrow_schema")?;
142 let fields: Vec<Field> = s
143 .split(',')
144 .filter(|part| !part.is_empty())
145 .filter_map(|part| {
146 let (name, type_str) = part.split_once(':')?;
147 let dt = match type_str {
148 "Utf8" => DataType::Utf8,
149 "LargeUtf8" => DataType::LargeUtf8,
150 "Float64" => DataType::Float64,
151 "Float32" => DataType::Float32,
152 "Int64" => DataType::Int64,
153 "Int32" => DataType::Int32,
154 "Int16" => DataType::Int16,
155 "Int8" => DataType::Int8,
156 "UInt64" => DataType::UInt64,
157 "UInt32" => DataType::UInt32,
158 "Boolean" => DataType::Boolean,
159 "Date32" => DataType::Date32,
160 "Date64" => DataType::Date64,
161 "Timestamp(Millisecond)" => {
162 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
163 }
164 "Timestamp(Microsecond)" => {
165 DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
166 }
167 "Timestamp(Nanosecond)" => {
168 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
169 }
170 "Timestamp(Second)" => {
171 DataType::Timestamp(arrow_schema::TimeUnit::Second, None)
172 }
173 other => {
174 tracing::warn!(
175 type_str = other,
176 field = name,
177 "unknown Arrow type in _arrow_schema — column skipped"
178 );
179 return None;
180 }
181 };
182 Some(Field::new(name, dt, true))
183 })
184 .collect();
185
186 if fields.is_empty() {
187 None
188 } else {
189 Some(Arc::new(Schema::new(fields)))
190 }
191 }
192
193 #[cfg(test)]
198 #[must_use]
199 pub fn display_redacted(&self) -> String {
200 use crate::storage::SecretMasker;
201 SecretMasker::display_map(&self.properties)
202 }
203
204 pub fn validate(&self, specs: &[ConfigKeySpec]) -> Result<(), ConnectorError> {
211 for spec in specs {
212 if spec.required && self.get(&spec.key).is_none() {
213 if let Some(ref default) = spec.default {
214 let _ = default;
216 } else {
217 return Err(ConnectorError::MissingConfig(spec.key.clone()));
218 }
219 }
220 }
221 Ok(())
222 }
223}
224
225pub fn require_non_empty(value: &str, field_name: &str) -> Result<(), ConnectorError> {
231 if value.is_empty() {
232 return Err(ConnectorError::ConfigurationError(format!(
233 "{field_name} must not be empty",
234 )));
235 }
236 Ok(())
237}
238
239pub fn parse_port(value: &str) -> Result<u16, ConnectorError> {
245 value
246 .parse()
247 .map_err(|_| ConnectorError::ConfigurationError(format!("invalid port: '{value}'")))
248}
249
250#[derive(Debug, Clone)]
254pub struct ConfigKeySpec {
255 pub key: String,
257
258 pub description: String,
260
261 pub required: bool,
263
264 pub default: Option<String>,
266}
267
268impl ConfigKeySpec {
269 #[must_use]
271 pub fn required(key: impl Into<String>, description: impl Into<String>) -> Self {
272 Self {
273 key: key.into(),
274 description: description.into(),
275 required: true,
276 default: None,
277 }
278 }
279
280 #[must_use]
282 pub fn optional(
283 key: impl Into<String>,
284 description: impl Into<String>,
285 default: impl Into<String>,
286 ) -> Self {
287 Self {
288 key: key.into(),
289 description: description.into(),
290 required: false,
291 default: Some(default.into()),
292 }
293 }
294}
295
296#[derive(Debug, Clone)]
298pub struct ConnectorInfo {
299 pub name: String,
301
302 pub display_name: String,
304
305 pub version: String,
307
308 pub is_source: bool,
310
311 pub is_sink: bool,
313
314 pub config_keys: Vec<ConfigKeySpec>,
316}
317
318#[derive(Debug, Clone, Copy, PartialEq, Eq)]
320pub enum ConnectorState {
321 Created,
323
324 Initializing,
326
327 Running,
329
330 Paused,
332
333 Recovering,
335
336 Closed,
338
339 Failed,
341}
342
343impl fmt::Display for ConnectorState {
344 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345 match self {
346 ConnectorState::Created => write!(f, "Created"),
347 ConnectorState::Initializing => write!(f, "Initializing"),
348 ConnectorState::Running => write!(f, "Running"),
349 ConnectorState::Paused => write!(f, "Paused"),
350 ConnectorState::Recovering => write!(f, "Recovering"),
351 ConnectorState::Closed => write!(f, "Closed"),
352 ConnectorState::Failed => write!(f, "Failed"),
353 }
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[test]
362 fn test_config_basic_operations() {
363 let mut config = ConnectorConfig::new("kafka");
364 config.set("bootstrap.servers", "localhost:9092");
365 config.set("topic", "events");
366
367 assert_eq!(config.connector_type(), "kafka");
368 assert_eq!(config.get("bootstrap.servers"), Some("localhost:9092"));
369 assert_eq!(config.get("topic"), Some("events"));
370 assert_eq!(config.get("missing"), None);
371 }
372
373 #[test]
374 fn test_config_require() {
375 let mut config = ConnectorConfig::new("kafka");
376 config.set("topic", "events");
377
378 assert!(config.require("topic").is_ok());
379 assert!(config.require("missing").is_err());
380 }
381
382 #[test]
383 fn test_config_parsed() {
384 let mut config = ConnectorConfig::new("kafka");
385 config.set("batch.size", "1000");
386 config.set("bad_number", "not_a_number");
387
388 let size: Option<usize> = config.get_parsed("batch.size").unwrap();
389 assert_eq!(size, Some(1000));
390
391 let missing: Option<usize> = config.get_parsed("missing").unwrap();
392 assert_eq!(missing, None);
393
394 let bad: Result<Option<usize>, _> = config.get_parsed("bad_number");
395 assert!(bad.is_err());
396 }
397
398 #[test]
399 fn test_config_require_parsed() {
400 let mut config = ConnectorConfig::new("test");
401 config.set("port", "8080");
402
403 let port: u16 = config.require_parsed("port").unwrap();
404 assert_eq!(port, 8080);
405
406 let missing: Result<u16, _> = config.require_parsed("missing");
407 assert!(missing.is_err());
408 }
409
410 #[test]
411 fn test_config_prefix_extraction() {
412 let mut config = ConnectorConfig::new("kafka");
413 config.set("kafka.bootstrap.servers", "localhost:9092");
414 config.set("kafka.group.id", "my-group");
415 config.set("topic", "events");
416
417 let kafka_props = config.properties_with_prefix("kafka.");
418 assert_eq!(kafka_props.len(), 2);
419 assert_eq!(
420 kafka_props.get("bootstrap.servers"),
421 Some(&"localhost:9092".to_string())
422 );
423 assert_eq!(kafka_props.get("group.id"), Some(&"my-group".to_string()));
424 }
425
426 #[test]
427 fn test_config_validate() {
428 let specs = vec![
429 ConfigKeySpec::required("topic", "Kafka topic"),
430 ConfigKeySpec::optional("batch.size", "Batch size", "100"),
431 ];
432
433 let mut config = ConnectorConfig::new("kafka");
434 config.set("topic", "events");
435
436 assert!(config.validate(&specs).is_ok());
437
438 let empty_config = ConnectorConfig::new("kafka");
439 assert!(empty_config.validate(&specs).is_err());
440 }
441
442 #[test]
443 fn test_config_with_properties() {
444 let mut props = HashMap::new();
445 props.insert("key1".to_string(), "val1".to_string());
446 props.insert("key2".to_string(), "val2".to_string());
447
448 let config = ConnectorConfig::with_properties("test", props);
449 assert_eq!(config.get("key1"), Some("val1"));
450 assert_eq!(config.get("key2"), Some("val2"));
451 }
452
453 #[test]
454 fn test_connector_state_display() {
455 assert_eq!(ConnectorState::Running.to_string(), "Running");
456 assert_eq!(ConnectorState::Failed.to_string(), "Failed");
457 }
458
459 #[test]
460 fn test_display_redacted_masks_secrets() {
461 let mut config = ConnectorConfig::new("delta-lake");
462 config.set("aws_region", "us-east-1");
463 config.set("aws_secret_access_key", "TOP_SECRET");
464 config.set("aws_access_key_id", "AKID123");
465
466 let display = config.display_redacted();
467 assert!(display.contains("aws_region=us-east-1"));
468 assert!(display.contains("aws_secret_access_key=***"));
469 assert!(display.contains("aws_access_key_id=AKID123"));
470 assert!(!display.contains("TOP_SECRET"));
471 }
472
473 #[test]
474 fn test_display_redacted_empty() {
475 let config = ConnectorConfig::new("test");
476 assert!(config.display_redacted().is_empty());
477 }
478
479 #[test]
480 fn test_arrow_schema_parses_compact_encoding() {
481 let mut config = ConnectorConfig::new("websocket");
482 config.set("_arrow_schema", "s:Utf8,p:Float64,q:Float64,T:Int64");
483
484 let schema = config.arrow_schema().expect("should parse");
485 assert_eq!(schema.fields().len(), 4);
486 assert_eq!(schema.field(0).name(), "s");
487 assert_eq!(schema.field(0).data_type(), &DataType::Utf8);
488 assert_eq!(schema.field(1).name(), "p");
489 assert_eq!(schema.field(1).data_type(), &DataType::Float64);
490 assert_eq!(schema.field(3).name(), "T");
491 assert_eq!(schema.field(3).data_type(), &DataType::Int64);
492 assert!(schema.field(0).is_nullable());
493 }
494
495 #[test]
496 fn test_arrow_schema_returns_none_when_absent() {
497 let config = ConnectorConfig::new("kafka");
498 assert!(config.arrow_schema().is_none());
499 }
500
501 #[test]
502 fn test_arrow_schema_returns_none_for_empty_value() {
503 let mut config = ConnectorConfig::new("kafka");
504 config.set("_arrow_schema", "");
505 assert!(config.arrow_schema().is_none());
506 }
507}