Skip to main content

laminar_connectors/
error.rs

1use thiserror::Error;
2
3/// Errors that can occur during connector operations.
4///
5/// Callers that need to distinguish "retry may work" from "propagate"
6/// should use [`ConnectorError::is_transient`] rather than matching
7/// variants directly — the variant set has changed in the past.
8#[derive(Debug, Error)]
9pub enum ConnectorError {
10    /// Failed to connect to the external system (network error, DNS
11    /// failure, TLS negotiation failure, auth rejection).
12    #[error("connection failed: {0}")]
13    ConnectionFailed(String),
14
15    /// Invalid, missing, or contradictory connector configuration.
16    #[error("configuration error: {0}")]
17    ConfigurationError(String),
18
19    /// Error reading data from a source.
20    #[error("read error: {0}")]
21    ReadError(String),
22
23    /// Error writing data to a sink.
24    #[error("write error: {0}")]
25    WriteError(String),
26
27    /// Serialization or deserialization error.
28    #[error("serde error: {0}")]
29    Serde(#[from] SerdeError),
30
31    /// Transaction error (begin/commit/rollback).
32    ///
33    /// Kept separate from [`Self::WriteError`] because transactional
34    /// failures are classified as **non-transient** by default; a write
35    /// error is transient. Per-connector retry policy can override, but
36    /// the default must not loop forever on bad transactional state.
37    #[error("transaction error: {0}")]
38    TransactionError(String),
39
40    /// The connector is not in the expected state.
41    #[error("invalid state: expected {expected}, got {actual}")]
42    InvalidState {
43        /// The expected state.
44        expected: String,
45        /// The actual state.
46        actual: String,
47    },
48
49    /// Schema mismatch between expected and actual data.
50    #[error("schema mismatch: {0}")]
51    SchemaMismatch(String),
52
53    /// Operation timed out.
54    #[error("timeout after {0}ms")]
55    Timeout(u64),
56
57    /// The connector has been closed.
58    #[error("connector closed")]
59    Closed,
60
61    /// An internal error that doesn't fit other categories.
62    #[error("internal error: {0}")]
63    Internal(String),
64
65    /// An I/O error from the underlying system.
66    #[error("I/O error: {0}")]
67    Io(#[from] std::io::Error),
68}
69
70impl From<laminar_core::lookup::source::LookupError> for ConnectorError {
71    fn from(err: laminar_core::lookup::source::LookupError) -> Self {
72        use laminar_core::lookup::source::LookupError;
73        match err {
74            LookupError::Connection(m) => Self::ConnectionFailed(m),
75            LookupError::Query(m) => Self::ReadError(m),
76            LookupError::Timeout(d) =>
77            {
78                #[allow(clippy::cast_possible_truncation)]
79                Self::Timeout(d.as_millis() as u64)
80            }
81            LookupError::NotAvailable(m) => Self::InvalidState {
82                expected: "lookup source available".into(),
83                actual: m,
84            },
85            LookupError::Internal(m) => Self::Internal(m),
86        }
87    }
88}
89
90impl ConnectorError {
91    /// Construct a "missing required config" error. Thin helper around
92    /// [`Self::ConfigurationError`] so every "missing required config:
93    /// {key}" message is shaped the same way.
94    #[must_use]
95    pub fn missing_config(key: impl Into<String>) -> Self {
96        Self::ConfigurationError(format!("missing required config: {}", key.into()))
97    }
98
99    /// Returns `true` if this error is likely transient and the operation
100    /// may succeed on retry (e.g., network timeout, throttled request).
101    /// Returns `false` for configuration, schema, and state errors that
102    /// will not resolve without user intervention.
103    #[must_use]
104    pub fn is_transient(&self) -> bool {
105        match self {
106            Self::ReadError(_)
107            | Self::WriteError(_)
108            | Self::Timeout(_)
109            | Self::Io(_)
110            | Self::ConnectionFailed(_) => true,
111
112            Self::ConfigurationError(_)
113            | Self::SchemaMismatch(_)
114            | Self::InvalidState { .. }
115            | Self::TransactionError(_)
116            | Self::Serde(_)
117            | Self::Closed
118            | Self::Internal(_) => false,
119        }
120    }
121}
122
123/// Errors that occur during record serialization or deserialization.
124#[derive(Debug, Error)]
125pub enum SerdeError {
126    /// JSON parsing or encoding error.
127    #[error("JSON error: {0}")]
128    Json(String),
129
130    /// CSV parsing or encoding error.
131    #[error("CSV error: {0}")]
132    Csv(String),
133
134    /// The data format is not supported.
135    #[error("unsupported format: {0}")]
136    UnsupportedFormat(String),
137
138    /// A required field is missing from the input.
139    #[error("missing field: {0}")]
140    MissingField(String),
141
142    /// A field value could not be converted to the target Arrow type.
143    #[error("type conversion error: field '{field}', expected {expected}: {message}")]
144    TypeConversion {
145        /// The field name.
146        field: String,
147        /// The expected Arrow data type.
148        expected: String,
149        /// Details about the conversion failure.
150        message: String,
151    },
152
153    /// The input data is malformed.
154    #[error("malformed input: {0}")]
155    MalformedInput(String),
156
157    /// Schema ID not found in registry.
158    #[error("schema not found: schema ID {schema_id}")]
159    SchemaNotFound {
160        /// The schema ID that was not found.
161        schema_id: i32,
162    },
163
164    /// Confluent wire format magic byte mismatch.
165    #[error("invalid Confluent header: expected 0x{expected:02x}, got 0x{got:02x}")]
166    InvalidConfluentHeader {
167        /// Expected magic byte (0x00).
168        expected: u8,
169        /// Actual byte found.
170        got: u8,
171    },
172
173    /// Schema incompatible with existing version in the registry.
174    #[error("schema incompatible: subject '{subject}': {message}")]
175    SchemaIncompatible {
176        /// The Schema Registry subject name.
177        subject: String,
178        /// Incompatibility details.
179        message: String,
180    },
181
182    /// Avro decode failure for a specific column.
183    #[error("Avro decode error: column '{column}' (avro type '{avro_type}'): {message}")]
184    AvroDecodeError {
185        /// The column that failed to decode.
186        column: String,
187        /// The Avro type being decoded.
188        avro_type: String,
189        /// The decode failure details.
190        message: String,
191    },
192
193    /// Record count mismatch after serialization.
194    #[error("record count mismatch: expected {expected}, got {got}")]
195    RecordCountMismatch {
196        /// Expected number of records.
197        expected: usize,
198        /// Actual number of records produced.
199        got: usize,
200    },
201}
202
203impl From<serde_json::Error> for SerdeError {
204    fn from(e: serde_json::Error) -> Self {
205        SerdeError::Json(e.to_string())
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn test_connector_error_display() {
215        let err = ConnectorError::ConnectionFailed("host unreachable".into());
216        assert_eq!(err.to_string(), "connection failed: host unreachable");
217    }
218
219    #[test]
220    fn test_serde_error_from_json() {
221        let json_err: Result<serde_json::Value, _> = serde_json::from_str("{bad json");
222        let serde_err: SerdeError = json_err.unwrap_err().into();
223        assert!(matches!(serde_err, SerdeError::Json(_)));
224    }
225
226    #[test]
227    fn test_serde_error_into_connector_error() {
228        let serde_err = SerdeError::MissingField("timestamp".into());
229        let conn_err: ConnectorError = serde_err.into();
230        assert!(matches!(conn_err, ConnectorError::Serde(_)));
231        assert!(conn_err.to_string().contains("timestamp"));
232    }
233
234    #[test]
235    fn test_invalid_state_error() {
236        let err = ConnectorError::InvalidState {
237            expected: "Running".into(),
238            actual: "Closed".into(),
239        };
240        assert!(err.to_string().contains("Running"));
241        assert!(err.to_string().contains("Closed"));
242    }
243
244    #[test]
245    fn test_schema_not_found_error() {
246        let err = SerdeError::SchemaNotFound { schema_id: 42 };
247        assert!(err.to_string().contains("42"));
248        assert!(err.to_string().contains("schema not found"));
249    }
250
251    #[test]
252    fn test_invalid_confluent_header_error() {
253        let err = SerdeError::InvalidConfluentHeader {
254            expected: 0x00,
255            got: 0xFF,
256        };
257        let msg = err.to_string();
258        assert!(msg.contains("0x00"));
259        assert!(msg.contains("0xff"));
260    }
261
262    #[test]
263    fn test_schema_incompatible_error() {
264        let err = SerdeError::SchemaIncompatible {
265            subject: "orders-value".into(),
266            message: "READER_FIELD_MISSING_DEFAULT_VALUE".into(),
267        };
268        let msg = err.to_string();
269        assert!(msg.contains("orders-value"));
270        assert!(msg.contains("READER_FIELD_MISSING_DEFAULT_VALUE"));
271    }
272
273    #[test]
274    fn test_avro_decode_error() {
275        let err = SerdeError::AvroDecodeError {
276            column: "price".into(),
277            avro_type: "double".into(),
278            message: "unexpected null".into(),
279        };
280        let msg = err.to_string();
281        assert!(msg.contains("price"));
282        assert!(msg.contains("double"));
283        assert!(msg.contains("unexpected null"));
284    }
285
286    #[test]
287    fn test_record_count_mismatch_error() {
288        let err = SerdeError::RecordCountMismatch {
289            expected: 5,
290            got: 3,
291        };
292        let msg = err.to_string();
293        assert!(msg.contains('5'));
294        assert!(msg.contains('3'));
295    }
296}