Skip to main content

laminar_core/streaming/
error.rs

1//! Streaming error types.
2
3use std::fmt;
4
5/// Error type for streaming operations.
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum StreamingError {
8    /// Channel is full and backpressure strategy is Error.
9    ChannelFull,
10
11    /// Channel is closed (all receivers dropped).
12    ChannelClosed,
13
14    /// Channel is disconnected (all senders dropped).
15    Disconnected,
16
17    /// Invalid configuration provided.
18    InvalidConfig(String),
19
20    /// Schema mismatch during `push_arrow` operation.
21    SchemaMismatch {
22        /// Expected schema field names.
23        expected: Vec<String>,
24        /// Actual schema field names.
25        actual: Vec<String>,
26    },
27
28    /// Operation timed out.
29    Timeout,
30
31    /// Internal error.
32    Internal(String),
33}
34
35impl fmt::Display for StreamingError {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        match self {
38            Self::ChannelFull => write!(f, "channel is full"),
39            Self::ChannelClosed => write!(f, "channel is closed"),
40            Self::Disconnected => write!(f, "channel is disconnected"),
41            Self::InvalidConfig(msg) => write!(f, "invalid configuration: {msg}"),
42            Self::SchemaMismatch { expected, actual } => {
43                write!(f, "schema mismatch: expected {expected:?}, got {actual:?}")
44            }
45            Self::Timeout => write!(f, "operation timed out"),
46            Self::Internal(msg) => write!(f, "internal error: {msg}"),
47        }
48    }
49}
50
51impl std::error::Error for StreamingError {}
52
53/// Error returned from `try_push` operations.
54#[derive(Debug)]
55pub struct TryPushError<T> {
56    /// The value that could not be pushed.
57    pub value: T,
58    /// The error that occurred.
59    pub error: StreamingError,
60}
61
62impl<T> TryPushError<T> {
63    /// Creates a new error indicating the channel is full.
64    #[must_use]
65    pub fn full(value: T) -> Self {
66        Self {
67            value,
68            error: StreamingError::ChannelFull,
69        }
70    }
71
72    /// Creates a new error indicating the receiver was dropped.
73    #[must_use]
74    pub fn disconnected(value: T) -> Self {
75        Self {
76            value,
77            error: StreamingError::Disconnected,
78        }
79    }
80
81    /// Consumes the error and returns the value that could not be pushed.
82    #[must_use]
83    pub fn into_inner(self) -> T {
84        self.value
85    }
86}
87
88impl<T: fmt::Debug> fmt::Display for TryPushError<T> {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        write!(f, "try_push failed: {}", self.error)
91    }
92}
93
94impl<T: fmt::Debug> std::error::Error for TryPushError<T> {}
95
96/// Error returned from `recv` operations.
97#[derive(Debug, Clone, PartialEq, Eq)]
98pub enum RecvError {
99    /// Channel is disconnected (all senders dropped).
100    Disconnected,
101
102    /// Operation timed out.
103    Timeout,
104}
105
106impl fmt::Display for RecvError {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        match self {
109            Self::Disconnected => write!(f, "channel disconnected"),
110            Self::Timeout => write!(f, "recv timed out"),
111        }
112    }
113}
114
115impl std::error::Error for RecvError {}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn test_streaming_error_display() {
123        assert_eq!(StreamingError::ChannelFull.to_string(), "channel is full");
124        assert_eq!(
125            StreamingError::ChannelClosed.to_string(),
126            "channel is closed"
127        );
128        assert_eq!(
129            StreamingError::Disconnected.to_string(),
130            "channel is disconnected"
131        );
132        assert_eq!(
133            StreamingError::InvalidConfig("bad".to_string()).to_string(),
134            "invalid configuration: bad"
135        );
136        assert_eq!(StreamingError::Timeout.to_string(), "operation timed out");
137    }
138
139    #[test]
140    fn test_try_push_error() {
141        let err = TryPushError::full(42);
142        assert_eq!(err.into_inner(), 42);
143    }
144
145    #[test]
146    fn test_recv_error_display() {
147        assert_eq!(RecvError::Disconnected.to_string(), "channel disconnected");
148        assert_eq!(RecvError::Timeout.to_string(), "recv timed out");
149    }
150
151    #[test]
152    fn test_schema_mismatch_display() {
153        let err = StreamingError::SchemaMismatch {
154            expected: vec!["a".to_string(), "b".to_string()],
155            actual: vec!["x".to_string(), "y".to_string()],
156        };
157        assert!(err.to_string().contains("schema mismatch"));
158    }
159}