Skip to main content

laminar_db/
error.rs

1//! Error types for the `LaminarDB` facade.
2
3use laminar_core::error_codes;
4
5/// Errors from database operations.
6#[derive(Debug, thiserror::Error)]
7pub enum DbError {
8    /// SQL parse error
9    Sql(#[from] laminar_sql::Error),
10
11    /// Core engine error
12    Engine(#[from] laminar_core::Error),
13
14    /// Streaming API error
15    Streaming(#[from] laminar_core::streaming::StreamingError),
16
17    /// `DataFusion` error (translated to user-friendly messages on display)
18    DataFusion(#[from] datafusion_common::DataFusionError),
19
20    /// Source not found
21    SourceNotFound(String),
22
23    /// Sink not found
24    SinkNotFound(String),
25
26    /// Query not found
27    QueryNotFound(String),
28
29    /// Source already exists
30    SourceAlreadyExists(String),
31
32    /// Sink already exists
33    SinkAlreadyExists(String),
34
35    /// Stream not found
36    StreamNotFound(String),
37
38    /// Stream already exists
39    StreamAlreadyExists(String),
40
41    /// Table not found
42    TableNotFound(String),
43
44    /// Table already exists
45    TableAlreadyExists(String),
46
47    /// Insert error
48    InsertError(String),
49
50    /// Schema mismatch between Rust type and SQL definition
51    SchemaMismatch(String),
52
53    /// Invalid SQL statement for the operation
54    InvalidOperation(String),
55
56    /// SQL parse error (from streaming parser)
57    SqlParse(#[from] laminar_sql::parser::ParseError),
58
59    /// Database is shut down
60    Shutdown,
61
62    /// Checkpoint error
63    Checkpoint(String),
64
65    /// Checkpoint store error (preserves structured source error).
66    CheckpointStore(#[from] laminar_storage::checkpoint_store::CheckpointStoreError),
67
68    /// Unresolved config variable
69    UnresolvedConfigVar(String),
70
71    /// Connector error
72    Connector(String),
73
74    /// Connector operation error (preserves structured source error).
75    ConnectorOp(#[from] laminar_connectors::error::ConnectorError),
76
77    /// Pipeline error (start/shutdown lifecycle)
78    Pipeline(String),
79
80    /// `BackpressurePolicy::Fail` tripped; coordinator halts the pipeline.
81    BackpressureFail(String),
82
83    /// Query pipeline error — wraps a `DataFusion` error with stream context.
84    /// Unlike `Pipeline`, this variant is translated to user-friendly messages.
85    QueryPipeline {
86        /// The stream or query name where the error occurred.
87        context: String,
88        /// The translated error message (already processed through
89        /// `translate_datafusion_error`).
90        translated: String,
91    },
92
93    /// Materialized view error
94    MaterializedView(String),
95
96    /// Storage backend error.
97    Storage(String),
98
99    /// Configuration / profile validation error
100    Config(String),
101
102    /// Operation is not yet implemented.
103    Unsupported(String),
104}
105
106impl DbError {
107    /// Create a `QueryPipeline` error from a `DataFusion` error with stream context.
108    ///
109    /// The `DataFusion` error is translated to a user-friendly message with
110    /// structured error codes. The raw `DataFusion` internals are never exposed.
111    pub fn query_pipeline(
112        context: impl Into<String>,
113        df_error: &datafusion_common::DataFusionError,
114    ) -> Self {
115        let translated = laminar_sql::error::translate_datafusion_error(&df_error.to_string());
116        Self::QueryPipeline {
117            context: context.into(),
118            translated: translated.to_string(),
119        }
120    }
121
122    /// Create a `QueryPipeline` error from a `DataFusion` error with stream
123    /// context and available column names for typo suggestions.
124    pub fn query_pipeline_with_columns(
125        context: impl Into<String>,
126        df_error: &datafusion_common::DataFusionError,
127        available_columns: &[&str],
128    ) -> Self {
129        let translated = laminar_sql::error::translate_datafusion_error_with_context(
130            &df_error.to_string(),
131            Some(available_columns),
132        );
133        Self::QueryPipeline {
134            context: context.into(),
135            translated: translated.to_string(),
136        }
137    }
138
139    /// Create a `QueryPipeline` error from an Arrow error with stream context.
140    pub fn query_pipeline_arrow(
141        context: impl Into<String>,
142        arrow_error: &arrow::error::ArrowError,
143    ) -> Self {
144        let translated = laminar_sql::error::translate_datafusion_error(&arrow_error.to_string());
145        Self::QueryPipeline {
146            context: context.into(),
147            translated: translated.to_string(),
148        }
149    }
150
151    /// Returns the structured `LDB-NNNN` error code for this error.
152    ///
153    /// Every `DbError` variant maps to a stable error code that can be used
154    /// for programmatic handling, log searching, and metrics.
155    #[must_use]
156    pub fn code(&self) -> &'static str {
157        match self {
158            Self::Sql(_) | Self::SqlParse(_) => error_codes::SQL_UNSUPPORTED,
159            Self::Engine(_) | Self::Streaming(_) => error_codes::INTERNAL,
160            Self::DataFusion(_) => error_codes::QUERY_EXECUTION_FAILED,
161            Self::SourceNotFound(_) => error_codes::SOURCE_NOT_FOUND,
162            Self::SinkNotFound(_) => error_codes::SINK_NOT_FOUND,
163            Self::QueryNotFound(_) | Self::StreamNotFound(_) | Self::TableNotFound(_) => {
164                error_codes::SQL_TABLE_NOT_FOUND
165            }
166            Self::SourceAlreadyExists(_)
167            | Self::StreamAlreadyExists(_)
168            | Self::TableAlreadyExists(_) => error_codes::SOURCE_ALREADY_EXISTS,
169            Self::SinkAlreadyExists(_) => error_codes::SINK_ALREADY_EXISTS,
170            Self::InsertError(_) => error_codes::CONNECTOR_WRITE_ERROR,
171            Self::SchemaMismatch(_) => error_codes::SCHEMA_MISMATCH,
172            Self::InvalidOperation(_) | Self::Unsupported(_) => error_codes::INVALID_OPERATION,
173            Self::Shutdown => error_codes::SHUTDOWN,
174            Self::Checkpoint(_) | Self::CheckpointStore(_) => error_codes::CHECKPOINT_FAILED,
175            Self::UnresolvedConfigVar(_) => error_codes::UNRESOLVED_CONFIG_VAR,
176            Self::Connector(_) | Self::ConnectorOp(_) => error_codes::CONNECTOR_CONNECTION_FAILED,
177            Self::Pipeline(_) | Self::BackpressureFail(_) => error_codes::PIPELINE_ERROR,
178            Self::QueryPipeline { .. } => error_codes::QUERY_PIPELINE_ERROR,
179            Self::MaterializedView(_) => error_codes::MATERIALIZED_VIEW_ERROR,
180            Self::Storage(_) => error_codes::WAL_ERROR,
181            Self::Config(_) => error_codes::INVALID_CONFIG,
182        }
183    }
184
185    /// Whether this error is transient (retryable).
186    #[must_use]
187    pub fn is_transient(&self) -> bool {
188        match self {
189            Self::Streaming(_)
190            | Self::Connector(_)
191            | Self::Checkpoint(_)
192            | Self::CheckpointStore(_) => true,
193            Self::ConnectorOp(e) => e.is_transient(),
194            _ => false,
195        }
196    }
197}
198
199impl std::fmt::Display for DbError {
200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        match self {
202            Self::Sql(e) => write!(f, "SQL error: {e}"),
203            Self::Engine(e) => write!(f, "Engine error: {e}"),
204            Self::Streaming(e) => write!(f, "Streaming error: {e}"),
205            Self::DataFusion(e) => {
206                let translated = laminar_sql::error::translate_datafusion_error(&e.to_string());
207                write!(f, "{translated}")
208            }
209            Self::SourceNotFound(name) => {
210                write!(f, "[{}] Source '{name}' not found", self.code())
211            }
212            Self::SinkNotFound(name) => {
213                write!(f, "[{}] Sink '{name}' not found", self.code())
214            }
215            Self::QueryNotFound(name) => {
216                write!(f, "[{}] Query '{name}' not found", self.code())
217            }
218            Self::SourceAlreadyExists(name) => {
219                write!(f, "[{}] Source '{name}' already exists", self.code())
220            }
221            Self::SinkAlreadyExists(name) => {
222                write!(f, "[{}] Sink '{name}' already exists", self.code())
223            }
224            Self::StreamNotFound(name) => {
225                write!(f, "[{}] Stream '{name}' not found", self.code())
226            }
227            Self::StreamAlreadyExists(name) => {
228                write!(f, "[{}] Stream '{name}' already exists", self.code())
229            }
230            Self::TableNotFound(name) => {
231                write!(f, "[{}] Table '{name}' not found", self.code())
232            }
233            Self::TableAlreadyExists(name) => {
234                write!(f, "[{}] Table '{name}' already exists", self.code())
235            }
236            Self::InsertError(msg) => {
237                write!(f, "[{}] Insert error: {msg}", self.code())
238            }
239            Self::SchemaMismatch(msg) => {
240                write!(f, "[{}] Schema mismatch: {msg}", self.code())
241            }
242            Self::InvalidOperation(msg) => {
243                write!(f, "[{}] Invalid operation: {msg}", self.code())
244            }
245            Self::SqlParse(e) => write!(f, "SQL parse error: {e}"),
246            Self::Shutdown => {
247                write!(f, "[{}] Database is shut down", self.code())
248            }
249            Self::Checkpoint(msg) => {
250                write!(f, "[{}] Checkpoint error: {msg}", self.code())
251            }
252            Self::CheckpointStore(e) => {
253                write!(f, "[{}] Checkpoint store error: {e}", self.code())
254            }
255            Self::UnresolvedConfigVar(msg) => {
256                write!(f, "[{}] Unresolved config variable: {msg}", self.code())
257            }
258            Self::Connector(msg) => {
259                write!(f, "[{}] Connector error: {msg}", self.code())
260            }
261            Self::ConnectorOp(e) => {
262                write!(f, "[{}] Connector error: {e}", self.code())
263            }
264            Self::Pipeline(msg) => {
265                write!(f, "[{}] Pipeline error: {msg}", self.code())
266            }
267            Self::BackpressureFail(msg) => {
268                write!(f, "[{}] Backpressure fail: {msg}", self.code())
269            }
270            Self::QueryPipeline {
271                context,
272                translated,
273            } => write!(f, "Stream '{context}': {translated}"),
274            Self::MaterializedView(msg) => {
275                write!(f, "[{}] Materialized view error: {msg}", self.code())
276            }
277            Self::Storage(msg) => {
278                write!(f, "[{}] Storage error: {msg}", self.code())
279            }
280            Self::Config(msg) => {
281                write!(f, "[{}] Config error: {msg}", self.code())
282            }
283            Self::Unsupported(msg) => {
284                write!(f, "[{}] Unsupported: {msg}", self.code())
285            }
286        }
287    }
288}