Skip to main content

laminar_connectors/
error.rs

1//! Connector SDK error types.
2//!
3//! Provides a unified error hierarchy for all connector operations:
4//! - `ConnectorError`: Top-level error for source/sink connector operations
5//! - `SerdeError`: Serialization/deserialization errors
6
7use thiserror::Error;
8
9/// Errors that can occur during connector operations.
10#[derive(Debug, Error)]
11pub enum ConnectorError {
12    /// Failed to connect to the external system.
13    #[error("connection failed: {0}")]
14    ConnectionFailed(String),
15
16    /// Authentication or authorization error.
17    #[error("authentication failed: {0}")]
18    AuthenticationFailed(String),
19
20    /// Invalid connector configuration.
21    #[error("configuration error: {0}")]
22    ConfigurationError(String),
23
24    /// Required configuration key is missing.
25    #[error("missing required config: {0}")]
26    MissingConfig(String),
27
28    /// Error reading data from a source.
29    #[error("read error: {0}")]
30    ReadError(String),
31
32    /// Error writing data to a sink.
33    #[error("write error: {0}")]
34    WriteError(String),
35
36    /// Serialization or deserialization error.
37    #[error("serde error: {0}")]
38    Serde(#[from] SerdeError),
39
40    /// Checkpoint or offset commit error.
41    #[error("checkpoint error: {0}")]
42    CheckpointError(String),
43
44    /// Transaction error (begin/commit/rollback).
45    #[error("transaction error: {0}")]
46    TransactionError(String),
47
48    /// The connector is not in the expected state.
49    #[error("invalid state: expected {expected}, got {actual}")]
50    InvalidState {
51        /// The expected state.
52        expected: String,
53        /// The actual state.
54        actual: String,
55    },
56
57    /// Schema mismatch between expected and actual data.
58    #[error("schema mismatch: {0}")]
59    SchemaMismatch(String),
60
61    /// Operation timed out.
62    #[error("timeout after {0}ms")]
63    Timeout(u64),
64
65    /// The connector has been closed.
66    #[error("connector closed")]
67    Closed,
68
69    /// The requested operation is not yet implemented.
70    #[error("unsupported operation: {0}")]
71    UnsupportedOperation(String),
72
73    /// An internal error that doesn't fit other categories.
74    #[error("internal error: {0}")]
75    Internal(String),
76
77    /// An I/O error from the underlying system.
78    #[error("I/O error: {0}")]
79    Io(#[from] std::io::Error),
80}
81
82/// Errors that occur during record serialization or deserialization.
83#[derive(Debug, Error)]
84pub enum SerdeError {
85    /// JSON parsing or encoding error.
86    #[error("JSON error: {0}")]
87    Json(String),
88
89    /// CSV parsing or encoding error.
90    #[error("CSV error: {0}")]
91    Csv(String),
92
93    /// The data format is not supported.
94    #[error("unsupported format: {0}")]
95    UnsupportedFormat(String),
96
97    /// A required field is missing from the input.
98    #[error("missing field: {0}")]
99    MissingField(String),
100
101    /// A field value could not be converted to the target Arrow type.
102    #[error("type conversion error: field '{field}', expected {expected}: {message}")]
103    TypeConversion {
104        /// The field name.
105        field: String,
106        /// The expected Arrow data type.
107        expected: String,
108        /// Details about the conversion failure.
109        message: String,
110    },
111
112    /// The input data is malformed.
113    #[error("malformed input: {0}")]
114    MalformedInput(String),
115
116    /// Schema ID not found in registry.
117    #[error("schema not found: schema ID {schema_id}")]
118    SchemaNotFound {
119        /// The schema ID that was not found.
120        schema_id: i32,
121    },
122
123    /// Confluent wire format magic byte mismatch.
124    #[error("invalid Confluent header: expected 0x{expected:02x}, got 0x{got:02x}")]
125    InvalidConfluentHeader {
126        /// Expected magic byte (0x00).
127        expected: u8,
128        /// Actual byte found.
129        got: u8,
130    },
131
132    /// Schema incompatible with existing version in the registry.
133    #[error("schema incompatible: subject '{subject}': {message}")]
134    SchemaIncompatible {
135        /// The Schema Registry subject name.
136        subject: String,
137        /// Incompatibility details.
138        message: String,
139    },
140
141    /// Avro decode failure for a specific column.
142    #[error("Avro decode error: column '{column}' (avro type '{avro_type}'): {message}")]
143    AvroDecodeError {
144        /// The column that failed to decode.
145        column: String,
146        /// The Avro type being decoded.
147        avro_type: String,
148        /// The decode failure details.
149        message: String,
150    },
151
152    /// Record count mismatch after serialization.
153    #[error("record count mismatch: expected {expected}, got {got}")]
154    RecordCountMismatch {
155        /// Expected number of records.
156        expected: usize,
157        /// Actual number of records produced.
158        got: usize,
159    },
160}
161
162impl From<serde_json::Error> for SerdeError {
163    fn from(e: serde_json::Error) -> Self {
164        SerdeError::Json(e.to_string())
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn test_connector_error_display() {
174        let err = ConnectorError::ConnectionFailed("host unreachable".into());
175        assert_eq!(err.to_string(), "connection failed: host unreachable");
176    }
177
178    #[test]
179    fn test_serde_error_from_json() {
180        let json_err: Result<serde_json::Value, _> = serde_json::from_str("{bad json");
181        let serde_err: SerdeError = json_err.unwrap_err().into();
182        assert!(matches!(serde_err, SerdeError::Json(_)));
183    }
184
185    #[test]
186    fn test_serde_error_into_connector_error() {
187        let serde_err = SerdeError::MissingField("timestamp".into());
188        let conn_err: ConnectorError = serde_err.into();
189        assert!(matches!(conn_err, ConnectorError::Serde(_)));
190        assert!(conn_err.to_string().contains("timestamp"));
191    }
192
193    #[test]
194    fn test_invalid_state_error() {
195        let err = ConnectorError::InvalidState {
196            expected: "Running".into(),
197            actual: "Closed".into(),
198        };
199        assert!(err.to_string().contains("Running"));
200        assert!(err.to_string().contains("Closed"));
201    }
202
203    #[test]
204    fn test_schema_not_found_error() {
205        let err = SerdeError::SchemaNotFound { schema_id: 42 };
206        assert!(err.to_string().contains("42"));
207        assert!(err.to_string().contains("schema not found"));
208    }
209
210    #[test]
211    fn test_invalid_confluent_header_error() {
212        let err = SerdeError::InvalidConfluentHeader {
213            expected: 0x00,
214            got: 0xFF,
215        };
216        let msg = err.to_string();
217        assert!(msg.contains("0x00"));
218        assert!(msg.contains("0xff"));
219    }
220
221    #[test]
222    fn test_schema_incompatible_error() {
223        let err = SerdeError::SchemaIncompatible {
224            subject: "orders-value".into(),
225            message: "READER_FIELD_MISSING_DEFAULT_VALUE".into(),
226        };
227        let msg = err.to_string();
228        assert!(msg.contains("orders-value"));
229        assert!(msg.contains("READER_FIELD_MISSING_DEFAULT_VALUE"));
230    }
231
232    #[test]
233    fn test_avro_decode_error() {
234        let err = SerdeError::AvroDecodeError {
235            column: "price".into(),
236            avro_type: "double".into(),
237            message: "unexpected null".into(),
238        };
239        let msg = err.to_string();
240        assert!(msg.contains("price"));
241        assert!(msg.contains("double"));
242        assert!(msg.contains("unexpected null"));
243    }
244
245    #[test]
246    fn test_record_count_mismatch_error() {
247        let err = SerdeError::RecordCountMismatch {
248            expected: 5,
249            got: 3,
250        };
251        let msg = err.to_string();
252        assert!(msg.contains('5'));
253        assert!(msg.contains('3'));
254    }
255}