laminar_connectors/schema/
error.rs1use thiserror::Error;
7
8use crate::error::ConnectorError;
9
10pub type SchemaResult<T> = Result<T, SchemaError>;
12
13#[derive(Debug, Error)]
15pub enum SchemaError {
16 #[error("inference failed: {0}")]
18 InferenceFailed(String),
19
20 #[error("incompatible schemas: {0}")]
22 Incompatible(String),
23
24 #[error("registry error: {0}")]
26 RegistryError(String),
27
28 #[error("decode error: {0}")]
30 DecodeError(String),
31
32 #[error("evolution rejected: {0}")]
34 EvolutionRejected(String),
35
36 #[error("missing config: {0}")]
38 MissingConfig(String),
39
40 #[error("invalid config key '{key}': {message}")]
42 InvalidConfig {
43 key: String,
45 message: String,
47 },
48
49 #[error("duplicate wildcard: only one `*` is allowed in the column list")]
51 DuplicateWildcard,
52
53 #[error(
55 "wildcard without resolution: `*` requires a connector with a schema provider or registry"
56 )]
57 WildcardWithoutResolution,
58
59 #[error("wildcard prefix collision: prefixed column '{0}' collides with a declared column")]
61 WildcardPrefixCollision(String),
62
63 #[error("wildcard expanded to zero new columns: all source columns are already declared")]
66 WildcardNoNewFields,
67
68 #[error("arrow error: {0}")]
70 Arrow(#[from] arrow_schema::ArrowError),
71
72 #[error(transparent)]
74 Other(Box<dyn std::error::Error + Send + Sync>),
75}
76
77impl From<ConnectorError> for SchemaError {
78 fn from(err: ConnectorError) -> Self {
79 match err {
80 ConnectorError::ConfigurationError(msg) => SchemaError::InvalidConfig {
83 key: String::new(),
84 message: msg,
85 },
86 ConnectorError::SchemaMismatch(msg) => SchemaError::Incompatible(msg),
87 other => SchemaError::Other(Box::new(other)),
88 }
89 }
90}
91
92impl From<SchemaError> for ConnectorError {
93 fn from(err: SchemaError) -> Self {
94 match err {
95 SchemaError::MissingConfig(msg) => ConnectorError::missing_config(msg),
96 SchemaError::Incompatible(msg) => ConnectorError::SchemaMismatch(msg),
97 SchemaError::DecodeError(msg) => ConnectorError::ReadError(msg),
98 other => ConnectorError::Internal(other.to_string()),
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106
107 #[test]
108 fn test_schema_error_display() {
109 let err = SchemaError::InferenceFailed("too few samples".into());
110 assert_eq!(err.to_string(), "inference failed: too few samples");
111 }
112
113 #[test]
114 fn test_schema_error_invalid_config() {
115 let err = SchemaError::InvalidConfig {
116 key: "format".into(),
117 message: "unknown format 'xml'".into(),
118 };
119 assert!(err.to_string().contains("format"));
120 assert!(err.to_string().contains("unknown format"));
121 }
122
123 #[test]
124 fn test_connector_to_schema_error() {
125 let ce = ConnectorError::missing_config("topic");
126 let se: SchemaError = ce.into();
127 assert!(
130 matches!(&se, SchemaError::InvalidConfig { message, .. } if message.contains("topic"))
131 );
132 }
133
134 #[test]
135 fn test_schema_to_connector_error() {
136 let se = SchemaError::Incompatible("field type mismatch".into());
137 let ce: ConnectorError = se.into();
138 assert!(matches!(ce, ConnectorError::SchemaMismatch(_)));
139 }
140
141 #[test]
142 fn test_schema_error_from_arrow() {
143 let arrow_err = arrow_schema::ArrowError::SchemaError("bad schema".into());
144 let se: SchemaError = arrow_err.into();
145 assert!(matches!(se, SchemaError::Arrow(_)));
146 assert!(se.to_string().contains("bad schema"));
147 }
148
149 #[test]
150 fn test_other_connector_error_wraps() {
151 let ce = ConnectorError::ConnectionFailed("host down".into());
152 let se: SchemaError = ce.into();
153 assert!(matches!(se, SchemaError::Other(_)));
154 assert!(se.to_string().contains("host down"));
155 }
156
157 #[test]
158 fn test_wildcard_errors_display() {
159 let e1 = SchemaError::DuplicateWildcard;
160 assert!(e1.to_string().contains("duplicate wildcard"));
161
162 let e2 = SchemaError::WildcardWithoutResolution;
163 assert!(e2.to_string().contains("schema provider"));
164
165 let e3 = SchemaError::WildcardPrefixCollision("src_id".into());
166 assert!(e3.to_string().contains("src_id"));
167
168 let e4 = SchemaError::WildcardNoNewFields;
169 assert!(e4.to_string().contains("zero new columns"));
170 }
171}