laminar_connectors/
error.rs1use thiserror::Error;
8
9#[derive(Debug, Error)]
11pub enum ConnectorError {
12 #[error("connection failed: {0}")]
14 ConnectionFailed(String),
15
16 #[error("authentication failed: {0}")]
18 AuthenticationFailed(String),
19
20 #[error("configuration error: {0}")]
22 ConfigurationError(String),
23
24 #[error("missing required config: {0}")]
26 MissingConfig(String),
27
28 #[error("read error: {0}")]
30 ReadError(String),
31
32 #[error("write error: {0}")]
34 WriteError(String),
35
36 #[error("serde error: {0}")]
38 Serde(#[from] SerdeError),
39
40 #[error("checkpoint error: {0}")]
42 CheckpointError(String),
43
44 #[error("transaction error: {0}")]
46 TransactionError(String),
47
48 #[error("invalid state: expected {expected}, got {actual}")]
50 InvalidState {
51 expected: String,
53 actual: String,
55 },
56
57 #[error("schema mismatch: {0}")]
59 SchemaMismatch(String),
60
61 #[error("timeout after {0}ms")]
63 Timeout(u64),
64
65 #[error("connector closed")]
67 Closed,
68
69 #[error("unsupported operation: {0}")]
71 UnsupportedOperation(String),
72
73 #[error("internal error: {0}")]
75 Internal(String),
76
77 #[error("I/O error: {0}")]
79 Io(#[from] std::io::Error),
80}
81
82#[derive(Debug, Error)]
84pub enum SerdeError {
85 #[error("JSON error: {0}")]
87 Json(String),
88
89 #[error("CSV error: {0}")]
91 Csv(String),
92
93 #[error("unsupported format: {0}")]
95 UnsupportedFormat(String),
96
97 #[error("missing field: {0}")]
99 MissingField(String),
100
101 #[error("type conversion error: field '{field}', expected {expected}: {message}")]
103 TypeConversion {
104 field: String,
106 expected: String,
108 message: String,
110 },
111
112 #[error("malformed input: {0}")]
114 MalformedInput(String),
115
116 #[error("schema not found: schema ID {schema_id}")]
118 SchemaNotFound {
119 schema_id: i32,
121 },
122
123 #[error("invalid Confluent header: expected 0x{expected:02x}, got 0x{got:02x}")]
125 InvalidConfluentHeader {
126 expected: u8,
128 got: u8,
130 },
131
132 #[error("schema incompatible: subject '{subject}': {message}")]
134 SchemaIncompatible {
135 subject: String,
137 message: String,
139 },
140
141 #[error("Avro decode error: column '{column}' (avro type '{avro_type}'): {message}")]
143 AvroDecodeError {
144 column: String,
146 avro_type: String,
148 message: String,
150 },
151
152 #[error("record count mismatch: expected {expected}, got {got}")]
154 RecordCountMismatch {
155 expected: usize,
157 got: usize,
159 },
160}
161
162impl From<serde_json::Error> for SerdeError {
163 fn from(e: serde_json::Error) -> Self {
164 SerdeError::Json(e.to_string())
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn test_connector_error_display() {
174 let err = ConnectorError::ConnectionFailed("host unreachable".into());
175 assert_eq!(err.to_string(), "connection failed: host unreachable");
176 }
177
178 #[test]
179 fn test_serde_error_from_json() {
180 let json_err: Result<serde_json::Value, _> = serde_json::from_str("{bad json");
181 let serde_err: SerdeError = json_err.unwrap_err().into();
182 assert!(matches!(serde_err, SerdeError::Json(_)));
183 }
184
185 #[test]
186 fn test_serde_error_into_connector_error() {
187 let serde_err = SerdeError::MissingField("timestamp".into());
188 let conn_err: ConnectorError = serde_err.into();
189 assert!(matches!(conn_err, ConnectorError::Serde(_)));
190 assert!(conn_err.to_string().contains("timestamp"));
191 }
192
193 #[test]
194 fn test_invalid_state_error() {
195 let err = ConnectorError::InvalidState {
196 expected: "Running".into(),
197 actual: "Closed".into(),
198 };
199 assert!(err.to_string().contains("Running"));
200 assert!(err.to_string().contains("Closed"));
201 }
202
203 #[test]
204 fn test_schema_not_found_error() {
205 let err = SerdeError::SchemaNotFound { schema_id: 42 };
206 assert!(err.to_string().contains("42"));
207 assert!(err.to_string().contains("schema not found"));
208 }
209
210 #[test]
211 fn test_invalid_confluent_header_error() {
212 let err = SerdeError::InvalidConfluentHeader {
213 expected: 0x00,
214 got: 0xFF,
215 };
216 let msg = err.to_string();
217 assert!(msg.contains("0x00"));
218 assert!(msg.contains("0xff"));
219 }
220
221 #[test]
222 fn test_schema_incompatible_error() {
223 let err = SerdeError::SchemaIncompatible {
224 subject: "orders-value".into(),
225 message: "READER_FIELD_MISSING_DEFAULT_VALUE".into(),
226 };
227 let msg = err.to_string();
228 assert!(msg.contains("orders-value"));
229 assert!(msg.contains("READER_FIELD_MISSING_DEFAULT_VALUE"));
230 }
231
232 #[test]
233 fn test_avro_decode_error() {
234 let err = SerdeError::AvroDecodeError {
235 column: "price".into(),
236 avro_type: "double".into(),
237 message: "unexpected null".into(),
238 };
239 let msg = err.to_string();
240 assert!(msg.contains("price"));
241 assert!(msg.contains("double"));
242 assert!(msg.contains("unexpected null"));
243 }
244
245 #[test]
246 fn test_record_count_mismatch_error() {
247 let err = SerdeError::RecordCountMismatch {
248 expected: 5,
249 got: 3,
250 };
251 let msg = err.to_string();
252 assert!(msg.contains('5'));
253 assert!(msg.contains('3'));
254 }
255}