1use laminar_core::error_codes;
4
5#[derive(Debug, thiserror::Error)]
7pub enum DbError {
8 Sql(#[from] laminar_sql::Error),
10
11 Engine(#[from] laminar_core::Error),
13
14 Streaming(#[from] laminar_core::streaming::StreamingError),
16
17 DataFusion(#[from] datafusion_common::DataFusionError),
19
20 SourceNotFound(String),
22
23 SinkNotFound(String),
25
26 QueryNotFound(String),
28
29 SourceAlreadyExists(String),
31
32 SinkAlreadyExists(String),
34
35 StreamNotFound(String),
37
38 StreamAlreadyExists(String),
40
41 TableNotFound(String),
43
44 TableAlreadyExists(String),
46
47 InsertError(String),
49
50 SchemaMismatch(String),
52
53 InvalidOperation(String),
55
56 SqlParse(#[from] laminar_sql::parser::ParseError),
58
59 Shutdown,
61
62 Checkpoint(String),
64
65 CheckpointStore(#[from] laminar_storage::checkpoint_store::CheckpointStoreError),
67
68 UnresolvedConfigVar(String),
70
71 Connector(String),
73
74 ConnectorOp(#[from] laminar_connectors::error::ConnectorError),
76
77 Pipeline(String),
79
80 BackpressureFail(String),
82
83 QueryPipeline {
86 context: String,
88 translated: String,
91 },
92
93 MaterializedView(String),
95
96 Storage(String),
98
99 Config(String),
101
102 Unsupported(String),
104}
105
106impl DbError {
107 pub fn query_pipeline(
112 context: impl Into<String>,
113 df_error: &datafusion_common::DataFusionError,
114 ) -> Self {
115 let translated = laminar_sql::error::translate_datafusion_error(&df_error.to_string());
116 Self::QueryPipeline {
117 context: context.into(),
118 translated: translated.to_string(),
119 }
120 }
121
122 pub fn query_pipeline_with_columns(
125 context: impl Into<String>,
126 df_error: &datafusion_common::DataFusionError,
127 available_columns: &[&str],
128 ) -> Self {
129 let translated = laminar_sql::error::translate_datafusion_error_with_context(
130 &df_error.to_string(),
131 Some(available_columns),
132 );
133 Self::QueryPipeline {
134 context: context.into(),
135 translated: translated.to_string(),
136 }
137 }
138
139 pub fn query_pipeline_arrow(
141 context: impl Into<String>,
142 arrow_error: &arrow::error::ArrowError,
143 ) -> Self {
144 let translated = laminar_sql::error::translate_datafusion_error(&arrow_error.to_string());
145 Self::QueryPipeline {
146 context: context.into(),
147 translated: translated.to_string(),
148 }
149 }
150
151 #[must_use]
156 pub fn code(&self) -> &'static str {
157 match self {
158 Self::Sql(_) | Self::SqlParse(_) => error_codes::SQL_UNSUPPORTED,
159 Self::Engine(_) | Self::Streaming(_) => error_codes::INTERNAL,
160 Self::DataFusion(_) => error_codes::QUERY_EXECUTION_FAILED,
161 Self::SourceNotFound(_) => error_codes::SOURCE_NOT_FOUND,
162 Self::SinkNotFound(_) => error_codes::SINK_NOT_FOUND,
163 Self::QueryNotFound(_) | Self::StreamNotFound(_) | Self::TableNotFound(_) => {
164 error_codes::SQL_TABLE_NOT_FOUND
165 }
166 Self::SourceAlreadyExists(_)
167 | Self::StreamAlreadyExists(_)
168 | Self::TableAlreadyExists(_) => error_codes::SOURCE_ALREADY_EXISTS,
169 Self::SinkAlreadyExists(_) => error_codes::SINK_ALREADY_EXISTS,
170 Self::InsertError(_) => error_codes::CONNECTOR_WRITE_ERROR,
171 Self::SchemaMismatch(_) => error_codes::SCHEMA_MISMATCH,
172 Self::InvalidOperation(_) | Self::Unsupported(_) => error_codes::INVALID_OPERATION,
173 Self::Shutdown => error_codes::SHUTDOWN,
174 Self::Checkpoint(_) | Self::CheckpointStore(_) => error_codes::CHECKPOINT_FAILED,
175 Self::UnresolvedConfigVar(_) => error_codes::UNRESOLVED_CONFIG_VAR,
176 Self::Connector(_) | Self::ConnectorOp(_) => error_codes::CONNECTOR_CONNECTION_FAILED,
177 Self::Pipeline(_) | Self::BackpressureFail(_) => error_codes::PIPELINE_ERROR,
178 Self::QueryPipeline { .. } => error_codes::QUERY_PIPELINE_ERROR,
179 Self::MaterializedView(_) => error_codes::MATERIALIZED_VIEW_ERROR,
180 Self::Storage(_) => error_codes::WAL_ERROR,
181 Self::Config(_) => error_codes::INVALID_CONFIG,
182 }
183 }
184
185 #[must_use]
187 pub fn is_transient(&self) -> bool {
188 match self {
189 Self::Streaming(_)
190 | Self::Connector(_)
191 | Self::Checkpoint(_)
192 | Self::CheckpointStore(_) => true,
193 Self::ConnectorOp(e) => e.is_transient(),
194 _ => false,
195 }
196 }
197}
198
199impl std::fmt::Display for DbError {
200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 match self {
202 Self::Sql(e) => write!(f, "SQL error: {e}"),
203 Self::Engine(e) => write!(f, "Engine error: {e}"),
204 Self::Streaming(e) => write!(f, "Streaming error: {e}"),
205 Self::DataFusion(e) => {
206 let translated = laminar_sql::error::translate_datafusion_error(&e.to_string());
207 write!(f, "{translated}")
208 }
209 Self::SourceNotFound(name) => {
210 write!(f, "[{}] Source '{name}' not found", self.code())
211 }
212 Self::SinkNotFound(name) => {
213 write!(f, "[{}] Sink '{name}' not found", self.code())
214 }
215 Self::QueryNotFound(name) => {
216 write!(f, "[{}] Query '{name}' not found", self.code())
217 }
218 Self::SourceAlreadyExists(name) => {
219 write!(f, "[{}] Source '{name}' already exists", self.code())
220 }
221 Self::SinkAlreadyExists(name) => {
222 write!(f, "[{}] Sink '{name}' already exists", self.code())
223 }
224 Self::StreamNotFound(name) => {
225 write!(f, "[{}] Stream '{name}' not found", self.code())
226 }
227 Self::StreamAlreadyExists(name) => {
228 write!(f, "[{}] Stream '{name}' already exists", self.code())
229 }
230 Self::TableNotFound(name) => {
231 write!(f, "[{}] Table '{name}' not found", self.code())
232 }
233 Self::TableAlreadyExists(name) => {
234 write!(f, "[{}] Table '{name}' already exists", self.code())
235 }
236 Self::InsertError(msg) => {
237 write!(f, "[{}] Insert error: {msg}", self.code())
238 }
239 Self::SchemaMismatch(msg) => {
240 write!(f, "[{}] Schema mismatch: {msg}", self.code())
241 }
242 Self::InvalidOperation(msg) => {
243 write!(f, "[{}] Invalid operation: {msg}", self.code())
244 }
245 Self::SqlParse(e) => write!(f, "SQL parse error: {e}"),
246 Self::Shutdown => {
247 write!(f, "[{}] Database is shut down", self.code())
248 }
249 Self::Checkpoint(msg) => {
250 write!(f, "[{}] Checkpoint error: {msg}", self.code())
251 }
252 Self::CheckpointStore(e) => {
253 write!(f, "[{}] Checkpoint store error: {e}", self.code())
254 }
255 Self::UnresolvedConfigVar(msg) => {
256 write!(f, "[{}] Unresolved config variable: {msg}", self.code())
257 }
258 Self::Connector(msg) => {
259 write!(f, "[{}] Connector error: {msg}", self.code())
260 }
261 Self::ConnectorOp(e) => {
262 write!(f, "[{}] Connector error: {e}", self.code())
263 }
264 Self::Pipeline(msg) => {
265 write!(f, "[{}] Pipeline error: {msg}", self.code())
266 }
267 Self::BackpressureFail(msg) => {
268 write!(f, "[{}] Backpressure fail: {msg}", self.code())
269 }
270 Self::QueryPipeline {
271 context,
272 translated,
273 } => write!(f, "Stream '{context}': {translated}"),
274 Self::MaterializedView(msg) => {
275 write!(f, "[{}] Materialized view error: {msg}", self.code())
276 }
277 Self::Storage(msg) => {
278 write!(f, "[{}] Storage error: {msg}", self.code())
279 }
280 Self::Config(msg) => {
281 write!(f, "[{}] Config error: {msg}", self.code())
282 }
283 Self::Unsupported(msg) => {
284 write!(f, "[{}] Unsupported: {msg}", self.code())
285 }
286 }
287 }
288}