Skip to main content

laminar_connectors/schema/
error.rs

1//! Schema error types.
2//!
3//! Provides [`SchemaError`] for schema inference, resolution, and evolution
4//! operations, plus a convenience [`SchemaResult`] alias.
5
6use thiserror::Error;
7
8use crate::error::ConnectorError;
9
10/// Result alias for schema operations.
11pub type SchemaResult<T> = Result<T, SchemaError>;
12
13/// Errors that can occur during schema operations.
14#[derive(Debug, Error)]
15pub enum SchemaError {
16    /// Schema inference failed (e.g., not enough samples, conflicting types).
17    #[error("inference failed: {0}")]
18    InferenceFailed(String),
19
20    /// Two schemas are incompatible and cannot be merged.
21    #[error("incompatible schemas: {0}")]
22    Incompatible(String),
23
24    /// Error communicating with a schema registry.
25    #[error("registry error: {0}")]
26    RegistryError(String),
27
28    /// Error decoding raw data into Arrow records.
29    #[error("decode error: {0}")]
30    DecodeError(String),
31
32    /// A proposed schema evolution was rejected by compatibility rules.
33    #[error("evolution rejected: {0}")]
34    EvolutionRejected(String),
35
36    /// A required configuration key is missing.
37    #[error("missing config: {0}")]
38    MissingConfig(String),
39
40    /// A configuration value is invalid.
41    #[error("invalid config key '{key}': {message}")]
42    InvalidConfig {
43        /// The configuration key.
44        key: String,
45        /// What was wrong with the value.
46        message: String,
47    },
48
49    /// Duplicate wildcard `*` in the column list.
50    #[error("duplicate wildcard: only one `*` is allowed in the column list")]
51    DuplicateWildcard,
52
53    /// Wildcard `*` used without a connector that supports schema inference.
54    #[error("wildcard without inference: `*` requires a connector that supports schema inference or a schema registry")]
55    WildcardWithoutInference,
56
57    /// A wildcard-prefixed column name collides with a declared column.
58    #[error("wildcard prefix collision: prefixed column '{0}' collides with a declared column")]
59    WildcardPrefixCollision(String),
60
61    /// Wildcard expanded to zero new columns (all source columns were
62    /// already declared).
63    #[error("wildcard expanded to zero new columns: all source columns are already declared")]
64    WildcardNoNewFields,
65
66    /// An Arrow error propagated from schema operations.
67    #[error("arrow error: {0}")]
68    Arrow(#[from] arrow_schema::ArrowError),
69
70    /// Catch-all for wrapped external errors.
71    #[error(transparent)]
72    Other(Box<dyn std::error::Error + Send + Sync>),
73}
74
75impl From<ConnectorError> for SchemaError {
76    fn from(err: ConnectorError) -> Self {
77        match err {
78            ConnectorError::MissingConfig(msg) => SchemaError::MissingConfig(msg),
79            ConnectorError::ConfigurationError(msg) => SchemaError::InvalidConfig {
80                key: String::new(),
81                message: msg,
82            },
83            ConnectorError::SchemaMismatch(msg) => SchemaError::Incompatible(msg),
84            other => SchemaError::Other(Box::new(other)),
85        }
86    }
87}
88
89impl From<SchemaError> for ConnectorError {
90    fn from(err: SchemaError) -> Self {
91        match err {
92            SchemaError::MissingConfig(msg) => ConnectorError::MissingConfig(msg),
93            SchemaError::Incompatible(msg) => ConnectorError::SchemaMismatch(msg),
94            SchemaError::DecodeError(msg) => ConnectorError::ReadError(msg),
95            other => ConnectorError::Internal(other.to_string()),
96        }
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[test]
105    fn test_schema_error_display() {
106        let err = SchemaError::InferenceFailed("too few samples".into());
107        assert_eq!(err.to_string(), "inference failed: too few samples");
108    }
109
110    #[test]
111    fn test_schema_error_invalid_config() {
112        let err = SchemaError::InvalidConfig {
113            key: "format".into(),
114            message: "unknown format 'xml'".into(),
115        };
116        assert!(err.to_string().contains("format"));
117        assert!(err.to_string().contains("unknown format"));
118    }
119
120    #[test]
121    fn test_connector_to_schema_error() {
122        let ce = ConnectorError::MissingConfig("topic".into());
123        let se: SchemaError = ce.into();
124        assert!(matches!(se, SchemaError::MissingConfig(ref m) if m == "topic"));
125    }
126
127    #[test]
128    fn test_schema_to_connector_error() {
129        let se = SchemaError::Incompatible("field type mismatch".into());
130        let ce: ConnectorError = se.into();
131        assert!(matches!(ce, ConnectorError::SchemaMismatch(_)));
132    }
133
134    #[test]
135    fn test_schema_error_from_arrow() {
136        let arrow_err = arrow_schema::ArrowError::SchemaError("bad schema".into());
137        let se: SchemaError = arrow_err.into();
138        assert!(matches!(se, SchemaError::Arrow(_)));
139        assert!(se.to_string().contains("bad schema"));
140    }
141
142    #[test]
143    fn test_other_connector_error_wraps() {
144        let ce = ConnectorError::ConnectionFailed("host down".into());
145        let se: SchemaError = ce.into();
146        assert!(matches!(se, SchemaError::Other(_)));
147        assert!(se.to_string().contains("host down"));
148    }
149
150    #[test]
151    fn test_wildcard_errors_display() {
152        let e1 = SchemaError::DuplicateWildcard;
153        assert!(e1.to_string().contains("duplicate wildcard"));
154
155        let e2 = SchemaError::WildcardWithoutInference;
156        assert!(e2.to_string().contains("schema inference"));
157
158        let e3 = SchemaError::WildcardPrefixCollision("src_id".into());
159        assert!(e3.to_string().contains("src_id"));
160
161        let e4 = SchemaError::WildcardNoNewFields;
162        assert!(e4.to_string().contains("zero new columns"));
163    }
164}