Skip to main content

laminar_core/compiler/
fallback.rs

1//! Fallback mechanism for pipeline compilation.
2//!
3//! [`ExecutablePipeline`] wraps either a successfully compiled pipeline or
4//! a fallback marker indicating that compilation failed. The caller in
5//! `laminar-sql` or `laminar-db` is responsible for providing interpreted
6//! `DataFusion` execution when the fallback variant is returned.
7
8use super::cache::CompilerCache;
9use super::error::CompileError;
10use super::pipeline::{CompiledPipeline, Pipeline, PipelineId};
11use std::sync::Arc;
12
13/// A pipeline that is either compiled to native code or marked for fallback.
14#[derive(Debug)]
15pub enum ExecutablePipeline {
16    /// Successfully compiled to native code.
17    Compiled(Arc<CompiledPipeline>),
18    /// Compilation failed; the caller should use `DataFusion` interpreted execution.
19    Fallback {
20        /// The pipeline ID for correlation.
21        pipeline_id: PipelineId,
22        /// The reason compilation failed.
23        reason: CompileError,
24    },
25}
26
27impl ExecutablePipeline {
28    /// Attempts to compile the pipeline, returning a fallback on failure.
29    pub fn try_compile(cache: &mut CompilerCache, pipeline: &Pipeline) -> Self {
30        match cache.get_or_compile(pipeline) {
31            Ok(compiled) => Self::Compiled(compiled),
32            Err(reason) => Self::Fallback {
33                pipeline_id: pipeline.id,
34                reason,
35            },
36        }
37    }
38
39    /// Returns `true` if the pipeline was successfully compiled.
40    #[must_use]
41    pub fn is_compiled(&self) -> bool {
42        matches!(self, Self::Compiled(_))
43    }
44
45    /// Returns the compiled pipeline if available.
46    #[must_use]
47    pub fn as_compiled(&self) -> Option<&Arc<CompiledPipeline>> {
48        match self {
49            Self::Compiled(c) => Some(c),
50            Self::Fallback { .. } => None,
51        }
52    }
53
54    /// Returns the pipeline ID.
55    #[must_use]
56    pub fn pipeline_id(&self) -> PipelineId {
57        match self {
58            Self::Compiled(c) => c.id,
59            Self::Fallback { pipeline_id, .. } => *pipeline_id,
60        }
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use crate::compiler::pipeline::{Pipeline, PipelineId, PipelineStage};
68    use arrow_schema::{DataType, Field, Schema};
69    use datafusion_expr::{col, lit};
70    use std::sync::Arc;
71
72    fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
73        Arc::new(Schema::new(
74            fields
75                .into_iter()
76                .map(|(name, dt)| Field::new(name, dt, false))
77                .collect::<Vec<_>>(),
78        ))
79    }
80
81    #[test]
82    fn try_compile_success() {
83        let mut cache = CompilerCache::new(10).unwrap();
84        let schema = make_schema(vec![("x", DataType::Int64)]);
85        let pipeline = Pipeline {
86            id: PipelineId(0),
87            stages: vec![PipelineStage::Filter {
88                predicate: col("x").gt(lit(10_i64)),
89            }],
90            input_schema: Arc::clone(&schema),
91            output_schema: schema,
92        };
93
94        let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
95        assert!(exec.is_compiled());
96        assert!(exec.as_compiled().is_some());
97        assert_eq!(exec.pipeline_id(), PipelineId(0));
98    }
99
100    #[test]
101    fn try_compile_fallback() {
102        let mut cache = CompilerCache::new(10).unwrap();
103        let schema = make_schema(vec![("x", DataType::Int64)]);
104        let pipeline = Pipeline {
105            id: PipelineId(1),
106            stages: vec![PipelineStage::Filter {
107                // Reference a non-existent column to trigger a compile error.
108                predicate: col("nonexistent").gt(lit(10_i64)),
109            }],
110            input_schema: Arc::clone(&schema),
111            output_schema: schema,
112        };
113
114        let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
115        assert!(!exec.is_compiled());
116        assert!(exec.as_compiled().is_none());
117        assert_eq!(exec.pipeline_id(), PipelineId(1));
118    }
119
120    #[test]
121    fn fallback_debug() {
122        let mut cache = CompilerCache::new(10).unwrap();
123        let schema = make_schema(vec![("x", DataType::Int64)]);
124        let pipeline = Pipeline {
125            id: PipelineId(2),
126            stages: vec![PipelineStage::Filter {
127                predicate: col("missing").gt(lit(1_i64)),
128            }],
129            input_schema: Arc::clone(&schema),
130            output_schema: schema,
131        };
132
133        let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
134        let dbg = format!("{exec:?}");
135        assert!(dbg.contains("Fallback"));
136    }
137
138    #[test]
139    fn compiled_pipeline_id_matches() {
140        let mut cache = CompilerCache::new(10).unwrap();
141        let schema = make_schema(vec![("x", DataType::Int64)]);
142        let pipeline = Pipeline {
143            id: PipelineId(42),
144            stages: vec![],
145            input_schema: Arc::clone(&schema),
146            output_schema: schema,
147        };
148
149        let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
150        assert_eq!(exec.pipeline_id(), PipelineId(42));
151    }
152}