Skip to main content

laminar_connectors/schema/
error.rs

1//! Schema error types.
2//!
3//! Provides [`SchemaError`] for schema inference, resolution, and evolution
4//! operations, plus a convenience [`SchemaResult`] alias.
5
6use thiserror::Error;
7
8use crate::error::ConnectorError;
9
10/// Result alias for schema operations.
11pub type SchemaResult<T> = Result<T, SchemaError>;
12
13/// Errors that can occur during schema operations.
14#[derive(Debug, Error)]
15pub enum SchemaError {
16    /// Schema inference failed (e.g., not enough samples, conflicting types).
17    #[error("inference failed: {0}")]
18    InferenceFailed(String),
19
20    /// Two schemas are incompatible and cannot be merged.
21    #[error("incompatible schemas: {0}")]
22    Incompatible(String),
23
24    /// Error communicating with a schema registry.
25    #[error("registry error: {0}")]
26    RegistryError(String),
27
28    /// Error decoding raw data into Arrow records.
29    #[error("decode error: {0}")]
30    DecodeError(String),
31
32    /// A proposed schema evolution was rejected by compatibility rules.
33    #[error("evolution rejected: {0}")]
34    EvolutionRejected(String),
35
36    /// A required configuration key is missing.
37    #[error("missing config: {0}")]
38    MissingConfig(String),
39
40    /// A configuration value is invalid.
41    #[error("invalid config key '{key}': {message}")]
42    InvalidConfig {
43        /// The configuration key.
44        key: String,
45        /// What was wrong with the value.
46        message: String,
47    },
48
49    /// Duplicate wildcard `*` in the column list.
50    #[error("duplicate wildcard: only one `*` is allowed in the column list")]
51    DuplicateWildcard,
52
53    /// Wildcard `*` used without a connector that provides schema resolution.
54    #[error(
55        "wildcard without resolution: `*` requires a connector with a schema provider or registry"
56    )]
57    WildcardWithoutResolution,
58
59    /// A wildcard-prefixed column name collides with a declared column.
60    #[error("wildcard prefix collision: prefixed column '{0}' collides with a declared column")]
61    WildcardPrefixCollision(String),
62
63    /// Wildcard expanded to zero new columns (all source columns were
64    /// already declared).
65    #[error("wildcard expanded to zero new columns: all source columns are already declared")]
66    WildcardNoNewFields,
67
68    /// An Arrow error propagated from schema operations.
69    #[error("arrow error: {0}")]
70    Arrow(#[from] arrow_schema::ArrowError),
71
72    /// Catch-all for wrapped external errors.
73    #[error(transparent)]
74    Other(Box<dyn std::error::Error + Send + Sync>),
75}
76
77impl From<ConnectorError> for SchemaError {
78    fn from(err: ConnectorError) -> Self {
79        match err {
80            // `ConnectorError::MissingConfig` folded into
81            // `ConfigurationError` — both land in `InvalidConfig` now.
82            ConnectorError::ConfigurationError(msg) => SchemaError::InvalidConfig {
83                key: String::new(),
84                message: msg,
85            },
86            ConnectorError::SchemaMismatch(msg) => SchemaError::Incompatible(msg),
87            other => SchemaError::Other(Box::new(other)),
88        }
89    }
90}
91
92impl From<SchemaError> for ConnectorError {
93    fn from(err: SchemaError) -> Self {
94        match err {
95            SchemaError::MissingConfig(msg) => ConnectorError::missing_config(msg),
96            SchemaError::Incompatible(msg) => ConnectorError::SchemaMismatch(msg),
97            SchemaError::DecodeError(msg) => ConnectorError::ReadError(msg),
98            other => ConnectorError::Internal(other.to_string()),
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    #[test]
108    fn test_schema_error_display() {
109        let err = SchemaError::InferenceFailed("too few samples".into());
110        assert_eq!(err.to_string(), "inference failed: too few samples");
111    }
112
113    #[test]
114    fn test_schema_error_invalid_config() {
115        let err = SchemaError::InvalidConfig {
116            key: "format".into(),
117            message: "unknown format 'xml'".into(),
118        };
119        assert!(err.to_string().contains("format"));
120        assert!(err.to_string().contains("unknown format"));
121    }
122
123    #[test]
124    fn test_connector_to_schema_error() {
125        let ce = ConnectorError::missing_config("topic");
126        let se: SchemaError = ce.into();
127        // `missing_config` lands in `ConfigurationError`, which maps to
128        // `SchemaError::InvalidConfig` with the full message preserved.
129        assert!(
130            matches!(&se, SchemaError::InvalidConfig { message, .. } if message.contains("topic"))
131        );
132    }
133
134    #[test]
135    fn test_schema_to_connector_error() {
136        let se = SchemaError::Incompatible("field type mismatch".into());
137        let ce: ConnectorError = se.into();
138        assert!(matches!(ce, ConnectorError::SchemaMismatch(_)));
139    }
140
141    #[test]
142    fn test_schema_error_from_arrow() {
143        let arrow_err = arrow_schema::ArrowError::SchemaError("bad schema".into());
144        let se: SchemaError = arrow_err.into();
145        assert!(matches!(se, SchemaError::Arrow(_)));
146        assert!(se.to_string().contains("bad schema"));
147    }
148
149    #[test]
150    fn test_other_connector_error_wraps() {
151        let ce = ConnectorError::ConnectionFailed("host down".into());
152        let se: SchemaError = ce.into();
153        assert!(matches!(se, SchemaError::Other(_)));
154        assert!(se.to_string().contains("host down"));
155    }
156
157    #[test]
158    fn test_wildcard_errors_display() {
159        let e1 = SchemaError::DuplicateWildcard;
160        assert!(e1.to_string().contains("duplicate wildcard"));
161
162        let e2 = SchemaError::WildcardWithoutResolution;
163        assert!(e2.to_string().contains("schema provider"));
164
165        let e3 = SchemaError::WildcardPrefixCollision("src_id".into());
166        assert!(e3.to_string().contains("src_id"));
167
168        let e4 = SchemaError::WildcardNoNewFields;
169        assert!(e4.to_string().contains("zero new columns"));
170    }
171}