laminar_core/compiler/
fallback.rs1use super::cache::CompilerCache;
9use super::error::CompileError;
10use super::pipeline::{CompiledPipeline, Pipeline, PipelineId};
11use std::sync::Arc;
12
13#[derive(Debug)]
15pub enum ExecutablePipeline {
16 Compiled(Arc<CompiledPipeline>),
18 Fallback {
20 pipeline_id: PipelineId,
22 reason: CompileError,
24 },
25}
26
27impl ExecutablePipeline {
28 pub fn try_compile(cache: &mut CompilerCache, pipeline: &Pipeline) -> Self {
30 match cache.get_or_compile(pipeline) {
31 Ok(compiled) => Self::Compiled(compiled),
32 Err(reason) => Self::Fallback {
33 pipeline_id: pipeline.id,
34 reason,
35 },
36 }
37 }
38
39 #[must_use]
41 pub fn is_compiled(&self) -> bool {
42 matches!(self, Self::Compiled(_))
43 }
44
45 #[must_use]
47 pub fn as_compiled(&self) -> Option<&Arc<CompiledPipeline>> {
48 match self {
49 Self::Compiled(c) => Some(c),
50 Self::Fallback { .. } => None,
51 }
52 }
53
54 #[must_use]
56 pub fn pipeline_id(&self) -> PipelineId {
57 match self {
58 Self::Compiled(c) => c.id,
59 Self::Fallback { pipeline_id, .. } => *pipeline_id,
60 }
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use crate::compiler::pipeline::{Pipeline, PipelineId, PipelineStage};
68 use arrow_schema::{DataType, Field, Schema};
69 use datafusion_expr::{col, lit};
70 use std::sync::Arc;
71
72 fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
73 Arc::new(Schema::new(
74 fields
75 .into_iter()
76 .map(|(name, dt)| Field::new(name, dt, false))
77 .collect::<Vec<_>>(),
78 ))
79 }
80
81 #[test]
82 fn try_compile_success() {
83 let mut cache = CompilerCache::new(10).unwrap();
84 let schema = make_schema(vec![("x", DataType::Int64)]);
85 let pipeline = Pipeline {
86 id: PipelineId(0),
87 stages: vec![PipelineStage::Filter {
88 predicate: col("x").gt(lit(10_i64)),
89 }],
90 input_schema: Arc::clone(&schema),
91 output_schema: schema,
92 };
93
94 let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
95 assert!(exec.is_compiled());
96 assert!(exec.as_compiled().is_some());
97 assert_eq!(exec.pipeline_id(), PipelineId(0));
98 }
99
100 #[test]
101 fn try_compile_fallback() {
102 let mut cache = CompilerCache::new(10).unwrap();
103 let schema = make_schema(vec![("x", DataType::Int64)]);
104 let pipeline = Pipeline {
105 id: PipelineId(1),
106 stages: vec![PipelineStage::Filter {
107 predicate: col("nonexistent").gt(lit(10_i64)),
109 }],
110 input_schema: Arc::clone(&schema),
111 output_schema: schema,
112 };
113
114 let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
115 assert!(!exec.is_compiled());
116 assert!(exec.as_compiled().is_none());
117 assert_eq!(exec.pipeline_id(), PipelineId(1));
118 }
119
120 #[test]
121 fn fallback_debug() {
122 let mut cache = CompilerCache::new(10).unwrap();
123 let schema = make_schema(vec![("x", DataType::Int64)]);
124 let pipeline = Pipeline {
125 id: PipelineId(2),
126 stages: vec![PipelineStage::Filter {
127 predicate: col("missing").gt(lit(1_i64)),
128 }],
129 input_schema: Arc::clone(&schema),
130 output_schema: schema,
131 };
132
133 let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
134 let dbg = format!("{exec:?}");
135 assert!(dbg.contains("Fallback"));
136 }
137
138 #[test]
139 fn compiled_pipeline_id_matches() {
140 let mut cache = CompilerCache::new(10).unwrap();
141 let schema = make_schema(vec![("x", DataType::Int64)]);
142 let pipeline = Pipeline {
143 id: PipelineId(42),
144 stages: vec![],
145 input_schema: Arc::clone(&schema),
146 output_schema: schema,
147 };
148
149 let exec = ExecutablePipeline::try_compile(&mut cache, &pipeline);
150 assert_eq!(exec.pipeline_id(), PipelineId(42));
151 }
152}