laminar_connectors/
error.rs1use thiserror::Error;
2
3#[derive(Debug, Error)]
9pub enum ConnectorError {
10 #[error("connection failed: {0}")]
13 ConnectionFailed(String),
14
15 #[error("configuration error: {0}")]
17 ConfigurationError(String),
18
19 #[error("read error: {0}")]
21 ReadError(String),
22
23 #[error("write error: {0}")]
25 WriteError(String),
26
27 #[error("serde error: {0}")]
29 Serde(#[from] SerdeError),
30
31 #[error("transaction error: {0}")]
38 TransactionError(String),
39
40 #[error("invalid state: expected {expected}, got {actual}")]
42 InvalidState {
43 expected: String,
45 actual: String,
47 },
48
49 #[error("schema mismatch: {0}")]
51 SchemaMismatch(String),
52
53 #[error("timeout after {0}ms")]
55 Timeout(u64),
56
57 #[error("connector closed")]
59 Closed,
60
61 #[error("internal error: {0}")]
63 Internal(String),
64
65 #[error("I/O error: {0}")]
67 Io(#[from] std::io::Error),
68}
69
70impl From<laminar_core::lookup::source::LookupError> for ConnectorError {
71 fn from(err: laminar_core::lookup::source::LookupError) -> Self {
72 use laminar_core::lookup::source::LookupError;
73 match err {
74 LookupError::Connection(m) => Self::ConnectionFailed(m),
75 LookupError::Query(m) => Self::ReadError(m),
76 LookupError::Timeout(d) =>
77 {
78 #[allow(clippy::cast_possible_truncation)]
79 Self::Timeout(d.as_millis() as u64)
80 }
81 LookupError::NotAvailable(m) => Self::InvalidState {
82 expected: "lookup source available".into(),
83 actual: m,
84 },
85 LookupError::Internal(m) => Self::Internal(m),
86 }
87 }
88}
89
90impl ConnectorError {
91 #[must_use]
95 pub fn missing_config(key: impl Into<String>) -> Self {
96 Self::ConfigurationError(format!("missing required config: {}", key.into()))
97 }
98
99 #[must_use]
104 pub fn is_transient(&self) -> bool {
105 match self {
106 Self::ReadError(_)
107 | Self::WriteError(_)
108 | Self::Timeout(_)
109 | Self::Io(_)
110 | Self::ConnectionFailed(_) => true,
111
112 Self::ConfigurationError(_)
113 | Self::SchemaMismatch(_)
114 | Self::InvalidState { .. }
115 | Self::TransactionError(_)
116 | Self::Serde(_)
117 | Self::Closed
118 | Self::Internal(_) => false,
119 }
120 }
121}
122
123#[derive(Debug, Error)]
125pub enum SerdeError {
126 #[error("JSON error: {0}")]
128 Json(String),
129
130 #[error("CSV error: {0}")]
132 Csv(String),
133
134 #[error("unsupported format: {0}")]
136 UnsupportedFormat(String),
137
138 #[error("missing field: {0}")]
140 MissingField(String),
141
142 #[error("type conversion error: field '{field}', expected {expected}: {message}")]
144 TypeConversion {
145 field: String,
147 expected: String,
149 message: String,
151 },
152
153 #[error("malformed input: {0}")]
155 MalformedInput(String),
156
157 #[error("schema not found: schema ID {schema_id}")]
159 SchemaNotFound {
160 schema_id: i32,
162 },
163
164 #[error("invalid Confluent header: expected 0x{expected:02x}, got 0x{got:02x}")]
166 InvalidConfluentHeader {
167 expected: u8,
169 got: u8,
171 },
172
173 #[error("schema incompatible: subject '{subject}': {message}")]
175 SchemaIncompatible {
176 subject: String,
178 message: String,
180 },
181
182 #[error("Avro decode error: column '{column}' (avro type '{avro_type}'): {message}")]
184 AvroDecodeError {
185 column: String,
187 avro_type: String,
189 message: String,
191 },
192
193 #[error("record count mismatch: expected {expected}, got {got}")]
195 RecordCountMismatch {
196 expected: usize,
198 got: usize,
200 },
201}
202
203impl From<serde_json::Error> for SerdeError {
204 fn from(e: serde_json::Error) -> Self {
205 SerdeError::Json(e.to_string())
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn test_connector_error_display() {
215 let err = ConnectorError::ConnectionFailed("host unreachable".into());
216 assert_eq!(err.to_string(), "connection failed: host unreachable");
217 }
218
219 #[test]
220 fn test_serde_error_from_json() {
221 let json_err: Result<serde_json::Value, _> = serde_json::from_str("{bad json");
222 let serde_err: SerdeError = json_err.unwrap_err().into();
223 assert!(matches!(serde_err, SerdeError::Json(_)));
224 }
225
226 #[test]
227 fn test_serde_error_into_connector_error() {
228 let serde_err = SerdeError::MissingField("timestamp".into());
229 let conn_err: ConnectorError = serde_err.into();
230 assert!(matches!(conn_err, ConnectorError::Serde(_)));
231 assert!(conn_err.to_string().contains("timestamp"));
232 }
233
234 #[test]
235 fn test_invalid_state_error() {
236 let err = ConnectorError::InvalidState {
237 expected: "Running".into(),
238 actual: "Closed".into(),
239 };
240 assert!(err.to_string().contains("Running"));
241 assert!(err.to_string().contains("Closed"));
242 }
243
244 #[test]
245 fn test_schema_not_found_error() {
246 let err = SerdeError::SchemaNotFound { schema_id: 42 };
247 assert!(err.to_string().contains("42"));
248 assert!(err.to_string().contains("schema not found"));
249 }
250
251 #[test]
252 fn test_invalid_confluent_header_error() {
253 let err = SerdeError::InvalidConfluentHeader {
254 expected: 0x00,
255 got: 0xFF,
256 };
257 let msg = err.to_string();
258 assert!(msg.contains("0x00"));
259 assert!(msg.contains("0xff"));
260 }
261
262 #[test]
263 fn test_schema_incompatible_error() {
264 let err = SerdeError::SchemaIncompatible {
265 subject: "orders-value".into(),
266 message: "READER_FIELD_MISSING_DEFAULT_VALUE".into(),
267 };
268 let msg = err.to_string();
269 assert!(msg.contains("orders-value"));
270 assert!(msg.contains("READER_FIELD_MISSING_DEFAULT_VALUE"));
271 }
272
273 #[test]
274 fn test_avro_decode_error() {
275 let err = SerdeError::AvroDecodeError {
276 column: "price".into(),
277 avro_type: "double".into(),
278 message: "unexpected null".into(),
279 };
280 let msg = err.to_string();
281 assert!(msg.contains("price"));
282 assert!(msg.contains("double"));
283 assert!(msg.contains("unexpected null"));
284 }
285
286 #[test]
287 fn test_record_count_mismatch_error() {
288 let err = SerdeError::RecordCountMismatch {
289 expected: 5,
290 got: 3,
291 };
292 let msg = err.to_string();
293 assert!(msg.contains('5'));
294 assert!(msg.contains('3'));
295 }
296}