laminar_connectors/schema/
error.rs1use thiserror::Error;
7
8use crate::error::ConnectorError;
9
10pub type SchemaResult<T> = Result<T, SchemaError>;
12
13#[derive(Debug, Error)]
15pub enum SchemaError {
16 #[error("inference failed: {0}")]
18 InferenceFailed(String),
19
20 #[error("incompatible schemas: {0}")]
22 Incompatible(String),
23
24 #[error("registry error: {0}")]
26 RegistryError(String),
27
28 #[error("decode error: {0}")]
30 DecodeError(String),
31
32 #[error("evolution rejected: {0}")]
34 EvolutionRejected(String),
35
36 #[error("missing config: {0}")]
38 MissingConfig(String),
39
40 #[error("invalid config key '{key}': {message}")]
42 InvalidConfig {
43 key: String,
45 message: String,
47 },
48
49 #[error("duplicate wildcard: only one `*` is allowed in the column list")]
51 DuplicateWildcard,
52
53 #[error("wildcard without inference: `*` requires a connector that supports schema inference or a schema registry")]
55 WildcardWithoutInference,
56
57 #[error("wildcard prefix collision: prefixed column '{0}' collides with a declared column")]
59 WildcardPrefixCollision(String),
60
61 #[error("wildcard expanded to zero new columns: all source columns are already declared")]
64 WildcardNoNewFields,
65
66 #[error("arrow error: {0}")]
68 Arrow(#[from] arrow_schema::ArrowError),
69
70 #[error(transparent)]
72 Other(Box<dyn std::error::Error + Send + Sync>),
73}
74
75impl From<ConnectorError> for SchemaError {
76 fn from(err: ConnectorError) -> Self {
77 match err {
78 ConnectorError::MissingConfig(msg) => SchemaError::MissingConfig(msg),
79 ConnectorError::ConfigurationError(msg) => SchemaError::InvalidConfig {
80 key: String::new(),
81 message: msg,
82 },
83 ConnectorError::SchemaMismatch(msg) => SchemaError::Incompatible(msg),
84 other => SchemaError::Other(Box::new(other)),
85 }
86 }
87}
88
89impl From<SchemaError> for ConnectorError {
90 fn from(err: SchemaError) -> Self {
91 match err {
92 SchemaError::MissingConfig(msg) => ConnectorError::MissingConfig(msg),
93 SchemaError::Incompatible(msg) => ConnectorError::SchemaMismatch(msg),
94 SchemaError::DecodeError(msg) => ConnectorError::ReadError(msg),
95 other => ConnectorError::Internal(other.to_string()),
96 }
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 #[test]
105 fn test_schema_error_display() {
106 let err = SchemaError::InferenceFailed("too few samples".into());
107 assert_eq!(err.to_string(), "inference failed: too few samples");
108 }
109
110 #[test]
111 fn test_schema_error_invalid_config() {
112 let err = SchemaError::InvalidConfig {
113 key: "format".into(),
114 message: "unknown format 'xml'".into(),
115 };
116 assert!(err.to_string().contains("format"));
117 assert!(err.to_string().contains("unknown format"));
118 }
119
120 #[test]
121 fn test_connector_to_schema_error() {
122 let ce = ConnectorError::MissingConfig("topic".into());
123 let se: SchemaError = ce.into();
124 assert!(matches!(se, SchemaError::MissingConfig(ref m) if m == "topic"));
125 }
126
127 #[test]
128 fn test_schema_to_connector_error() {
129 let se = SchemaError::Incompatible("field type mismatch".into());
130 let ce: ConnectorError = se.into();
131 assert!(matches!(ce, ConnectorError::SchemaMismatch(_)));
132 }
133
134 #[test]
135 fn test_schema_error_from_arrow() {
136 let arrow_err = arrow_schema::ArrowError::SchemaError("bad schema".into());
137 let se: SchemaError = arrow_err.into();
138 assert!(matches!(se, SchemaError::Arrow(_)));
139 assert!(se.to_string().contains("bad schema"));
140 }
141
142 #[test]
143 fn test_other_connector_error_wraps() {
144 let ce = ConnectorError::ConnectionFailed("host down".into());
145 let se: SchemaError = ce.into();
146 assert!(matches!(se, SchemaError::Other(_)));
147 assert!(se.to_string().contains("host down"));
148 }
149
150 #[test]
151 fn test_wildcard_errors_display() {
152 let e1 = SchemaError::DuplicateWildcard;
153 assert!(e1.to_string().contains("duplicate wildcard"));
154
155 let e2 = SchemaError::WildcardWithoutInference;
156 assert!(e2.to_string().contains("schema inference"));
157
158 let e3 = SchemaError::WildcardPrefixCollision("src_id".into());
159 assert!(e3.to_string().contains("src_id"));
160
161 let e4 = SchemaError::WildcardNoNewFields;
162 assert!(e4.to_string().contains("zero new columns"));
163 }
164}