1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
4use std::fmt;
5use std::sync::Arc;
6
7use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions};
8use arrow_schema::{Schema, SchemaRef};
9
10use crate::error::ConnectorError;
11
12#[must_use]
14pub fn encode_arrow_schema_ipc(schema: &Schema) -> String {
15 let mut dt = DictionaryTracker::new(false);
16 let enc = IpcDataGenerator::default().schema_to_bytes_with_dictionary_tracker(
17 schema,
18 &mut dt,
19 &IpcWriteOptions::default(),
20 );
21 let bytes = &enc.ipc_message;
22 let mut s = String::with_capacity(bytes.len() * 2);
23 for b in bytes {
24 use std::fmt::Write;
25 let _ = write!(s, "{b:02x}");
26 }
27 s
28}
29
30#[must_use]
32pub fn decode_arrow_schema_ipc(hex: &str) -> Option<Schema> {
33 let bytes: Option<Vec<u8>> = (0..hex.len())
34 .step_by(2)
35 .map(|i| {
36 hex.get(i..i + 2)
37 .and_then(|h| u8::from_str_radix(h, 16).ok())
38 })
39 .collect();
40 arrow_ipc::convert::try_schema_from_flatbuffer_bytes(&bytes?).ok()
41}
42
43#[derive(Debug, Clone, Default)]
48pub struct ConnectorConfig {
49 connector_type: String,
51
52 properties: HashMap<String, String>,
54}
55
56impl ConnectorConfig {
57 #[must_use]
59 pub fn new(connector_type: impl Into<String>) -> Self {
60 Self {
61 connector_type: connector_type.into(),
62 properties: HashMap::new(),
63 }
64 }
65
66 #[must_use]
68 pub fn with_properties(
69 connector_type: impl Into<String>,
70 properties: HashMap<String, String>,
71 ) -> Self {
72 Self {
73 connector_type: connector_type.into(),
74 properties,
75 }
76 }
77
78 #[must_use]
80 pub fn connector_type(&self) -> &str {
81 &self.connector_type
82 }
83
84 pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
86 self.properties.insert(key.into(), value.into());
87 }
88
89 #[must_use]
91 pub fn get(&self, key: &str) -> Option<&str> {
92 self.properties.get(key).map(String::as_str)
93 }
94
95 pub fn require(&self, key: &str) -> Result<&str, ConnectorError> {
101 self.get(key)
102 .ok_or_else(|| ConnectorError::missing_config(key.to_string()))
103 }
104
105 pub fn get_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<Option<T>, ConnectorError>
111 where
112 T::Err: fmt::Display,
113 {
114 match self.get(key) {
115 Some(v) => v.parse::<T>().map(Some).map_err(|e| {
116 ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
117 }),
118 None => Ok(None),
119 }
120 }
121
122 pub fn require_parsed<T: std::str::FromStr>(&self, key: &str) -> Result<T, ConnectorError>
129 where
130 T::Err: fmt::Display,
131 {
132 let value = self.require(key)?;
133 value.parse::<T>().map_err(|e| {
134 ConnectorError::ConfigurationError(format!("invalid value for '{key}': {e}"))
135 })
136 }
137
138 #[must_use]
140 pub fn properties(&self) -> &HashMap<String, String> {
141 &self.properties
142 }
143
144 #[must_use]
146 pub fn properties_with_prefix(&self, prefix: &str) -> HashMap<String, String> {
147 self.properties
148 .iter()
149 .filter_map(|(k, v)| {
150 k.strip_prefix(prefix)
151 .map(|stripped| (stripped.to_string(), v.clone()))
152 })
153 .collect()
154 }
155
156 #[must_use]
158 pub fn arrow_schema(&self) -> Option<SchemaRef> {
159 let s = self.get("_arrow_schema")?;
160 decode_arrow_schema_ipc(s).map(Arc::new)
161 }
162
163 #[cfg(test)]
168 #[must_use]
169 pub fn display_redacted(&self) -> String {
170 use crate::storage::SecretMasker;
171 SecretMasker::display_map(&self.properties)
172 }
173
174 pub fn validate(&self, specs: &[ConfigKeySpec]) -> Result<(), ConnectorError> {
181 for spec in specs {
182 if spec.required && self.get(&spec.key).is_none() {
183 if let Some(ref default) = spec.default {
184 let _ = default;
186 } else {
187 return Err(ConnectorError::missing_config(spec.key.clone()));
188 }
189 }
190 }
191 Ok(())
192 }
193}
194
195pub fn require_non_empty(value: &str, field_name: &str) -> Result<(), ConnectorError> {
201 if value.is_empty() {
202 return Err(ConnectorError::ConfigurationError(format!(
203 "{field_name} must not be empty",
204 )));
205 }
206 Ok(())
207}
208
209pub fn parse_port(value: &str) -> Result<u16, ConnectorError> {
215 value
216 .parse()
217 .map_err(|_| ConnectorError::ConfigurationError(format!("invalid port: '{value}'")))
218}
219
220#[derive(Debug, Clone)]
224pub struct ConfigKeySpec {
225 pub key: String,
227
228 pub description: String,
230
231 pub required: bool,
233
234 pub default: Option<String>,
236}
237
238impl ConfigKeySpec {
239 #[must_use]
241 pub fn required(key: impl Into<String>, description: impl Into<String>) -> Self {
242 Self {
243 key: key.into(),
244 description: description.into(),
245 required: true,
246 default: None,
247 }
248 }
249
250 #[must_use]
252 pub fn optional(
253 key: impl Into<String>,
254 description: impl Into<String>,
255 default: impl Into<String>,
256 ) -> Self {
257 Self {
258 key: key.into(),
259 description: description.into(),
260 required: false,
261 default: Some(default.into()),
262 }
263 }
264}
265
266#[derive(Debug, Clone)]
268pub struct ConnectorInfo {
269 pub name: String,
271
272 pub display_name: String,
274
275 pub version: String,
277
278 pub is_source: bool,
280
281 pub is_sink: bool,
283
284 pub config_keys: Vec<ConfigKeySpec>,
286}
287
288#[derive(Debug, Clone, Copy, PartialEq, Eq)]
290pub enum ConnectorState {
291 Created,
293
294 Initializing,
296
297 Running,
299
300 Paused,
302
303 Recovering,
305
306 Closed,
308
309 Failed,
311}
312
313impl fmt::Display for ConnectorState {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 match self {
316 ConnectorState::Created => write!(f, "Created"),
317 ConnectorState::Initializing => write!(f, "Initializing"),
318 ConnectorState::Running => write!(f, "Running"),
319 ConnectorState::Paused => write!(f, "Paused"),
320 ConnectorState::Recovering => write!(f, "Recovering"),
321 ConnectorState::Closed => write!(f, "Closed"),
322 ConnectorState::Failed => write!(f, "Failed"),
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use arrow_schema::Schema;
331
332 #[test]
333 fn test_config_basic_operations() {
334 let mut config = ConnectorConfig::new("kafka");
335 config.set("bootstrap.servers", "localhost:9092");
336 config.set("topic", "events");
337
338 assert_eq!(config.connector_type(), "kafka");
339 assert_eq!(config.get("bootstrap.servers"), Some("localhost:9092"));
340 assert_eq!(config.get("topic"), Some("events"));
341 assert_eq!(config.get("missing"), None);
342 }
343
344 #[test]
345 fn test_config_require() {
346 let mut config = ConnectorConfig::new("kafka");
347 config.set("topic", "events");
348
349 assert!(config.require("topic").is_ok());
350 assert!(config.require("missing").is_err());
351 }
352
353 #[test]
354 fn test_config_parsed() {
355 let mut config = ConnectorConfig::new("kafka");
356 config.set("batch.size", "1000");
357 config.set("bad_number", "not_a_number");
358
359 let size: Option<usize> = config.get_parsed("batch.size").unwrap();
360 assert_eq!(size, Some(1000));
361
362 let missing: Option<usize> = config.get_parsed("missing").unwrap();
363 assert_eq!(missing, None);
364
365 let bad: Result<Option<usize>, _> = config.get_parsed("bad_number");
366 assert!(bad.is_err());
367 }
368
369 #[test]
370 fn test_config_require_parsed() {
371 let mut config = ConnectorConfig::new("test");
372 config.set("port", "8080");
373
374 let port: u16 = config.require_parsed("port").unwrap();
375 assert_eq!(port, 8080);
376
377 let missing: Result<u16, _> = config.require_parsed("missing");
378 assert!(missing.is_err());
379 }
380
381 #[test]
382 fn test_config_prefix_extraction() {
383 let mut config = ConnectorConfig::new("kafka");
384 config.set("kafka.bootstrap.servers", "localhost:9092");
385 config.set("kafka.group.id", "my-group");
386 config.set("topic", "events");
387
388 let kafka_props = config.properties_with_prefix("kafka.");
389 assert_eq!(kafka_props.len(), 2);
390 assert_eq!(
391 kafka_props.get("bootstrap.servers"),
392 Some(&"localhost:9092".to_string())
393 );
394 assert_eq!(kafka_props.get("group.id"), Some(&"my-group".to_string()));
395 }
396
397 #[test]
398 fn test_config_validate() {
399 let specs = vec![
400 ConfigKeySpec::required("topic", "Kafka topic"),
401 ConfigKeySpec::optional("batch.size", "Batch size", "100"),
402 ];
403
404 let mut config = ConnectorConfig::new("kafka");
405 config.set("topic", "events");
406
407 assert!(config.validate(&specs).is_ok());
408
409 let empty_config = ConnectorConfig::new("kafka");
410 assert!(empty_config.validate(&specs).is_err());
411 }
412
413 #[test]
414 fn test_config_with_properties() {
415 let mut props = HashMap::new();
416 props.insert("key1".to_string(), "val1".to_string());
417 props.insert("key2".to_string(), "val2".to_string());
418
419 let config = ConnectorConfig::with_properties("test", props);
420 assert_eq!(config.get("key1"), Some("val1"));
421 assert_eq!(config.get("key2"), Some("val2"));
422 }
423
424 #[test]
425 fn test_connector_state_display() {
426 assert_eq!(ConnectorState::Running.to_string(), "Running");
427 assert_eq!(ConnectorState::Failed.to_string(), "Failed");
428 }
429
430 #[test]
431 fn test_display_redacted_masks_secrets() {
432 let mut config = ConnectorConfig::new("delta-lake");
433 config.set("aws_region", "us-east-1");
434 config.set("aws_secret_access_key", "TOP_SECRET");
435 config.set("aws_access_key_id", "AKID123");
436
437 let display = config.display_redacted();
438 assert!(display.contains("aws_region=us-east-1"));
439 assert!(display.contains("aws_secret_access_key=***"));
440 assert!(display.contains("aws_access_key_id=AKID123"));
441 assert!(!display.contains("TOP_SECRET"));
442 }
443
444 #[test]
445 fn test_display_redacted_empty() {
446 let config = ConnectorConfig::new("test");
447 assert!(config.display_redacted().is_empty());
448 }
449
450 #[test]
451 fn test_arrow_schema_ipc_round_trip() {
452 use arrow_schema::{DataType, Field};
453
454 let original = Schema::new(vec![
455 Field::new("s", DataType::Utf8, true),
456 Field::new("p", DataType::Float64, true),
457 Field::new("q", DataType::Float64, true),
458 Field::new("T", DataType::Int64, true),
459 ]);
460 let hex = encode_arrow_schema_ipc(&original);
461
462 let mut config = ConnectorConfig::new("websocket");
463 config.set("_arrow_schema", hex);
464
465 let schema = config.arrow_schema().expect("should parse");
466 assert_eq!(schema.fields().len(), 4);
467 assert_eq!(schema.field(0).name(), "s");
468 assert_eq!(schema.field(0).data_type(), &DataType::Utf8);
469 assert_eq!(schema.field(1).name(), "p");
470 assert_eq!(schema.field(1).data_type(), &DataType::Float64);
471 assert_eq!(schema.field(3).name(), "T");
472 assert_eq!(schema.field(3).data_type(), &DataType::Int64);
473 }
474
475 #[test]
476 fn test_arrow_schema_ipc_handles_map_type() {
477 use arrow_schema::{DataType, Field, Fields};
478
479 let map_field = Field::new(
480 "entries",
481 DataType::Struct(Fields::from(vec![
482 Field::new("key", DataType::Utf8, false),
483 Field::new("value", DataType::Float64, true),
484 ])),
485 false,
486 );
487 let original = Schema::new(vec![
488 Field::new("sensor_id", DataType::Utf8, true),
489 Field::new("data", DataType::Map(Arc::new(map_field), false), true),
490 ]);
491 let hex = encode_arrow_schema_ipc(&original);
492
493 let mut config = ConnectorConfig::new("kafka");
494 config.set("_arrow_schema", hex);
495
496 let schema = config.arrow_schema().expect("should parse Map type");
497 assert_eq!(schema.fields().len(), 2);
498 assert_eq!(schema.field(0).name(), "sensor_id");
499 assert!(matches!(schema.field(1).data_type(), DataType::Map(_, _)));
500 }
501
502 #[test]
503 fn test_arrow_schema_returns_none_when_absent() {
504 let config = ConnectorConfig::new("kafka");
505 assert!(config.arrow_schema().is_none());
506 }
507
508 #[test]
509 fn test_arrow_schema_returns_none_for_empty_value() {
510 let mut config = ConnectorConfig::new("kafka");
511 config.set("_arrow_schema", "");
512 assert!(config.arrow_schema().is_none());
513 }
514}