laminar_core/compiler/
cache.rs1use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::error::CompileError;
12use super::jit::JitContext;
13use super::pipeline::{CompiledPipeline, Pipeline};
14use super::pipeline_compiler::PipelineCompiler;
15
16pub struct CompilerCache {
18 cache: FxHashMap<u64, Arc<CompiledPipeline>>,
19 jit: JitContext,
20 max_entries: usize,
21 insertion_order: Vec<u64>,
23}
24
25impl CompilerCache {
26 pub fn new(max_entries: usize) -> Result<Self, CompileError> {
32 Ok(Self {
33 cache: FxHashMap::default(),
34 jit: JitContext::new()?,
35 max_entries,
36 insertion_order: Vec::new(),
37 })
38 }
39
40 pub fn get_or_compile(
46 &mut self,
47 pipeline: &Pipeline,
48 ) -> Result<Arc<CompiledPipeline>, CompileError> {
49 let hash = hash_pipeline(pipeline);
50
51 if let Some(cached) = self.cache.get(&hash) {
52 return Ok(Arc::clone(cached));
53 }
54
55 if self.cache.len() >= self.max_entries && !self.insertion_order.is_empty() {
57 let oldest_key = self.insertion_order.remove(0);
58 self.cache.remove(&oldest_key);
59 }
60
61 let mut comp = PipelineCompiler::new(&mut self.jit);
62 let result = Arc::new(comp.compile(pipeline)?);
63
64 self.cache.insert(hash, Arc::clone(&result));
65 self.insertion_order.push(hash);
66
67 Ok(result)
68 }
69
70 #[must_use]
72 pub fn len(&self) -> usize {
73 self.cache.len()
74 }
75
76 #[must_use]
78 pub fn is_empty(&self) -> bool {
79 self.cache.is_empty()
80 }
81
82 pub fn clear(&mut self) {
84 self.cache.clear();
85 self.insertion_order.clear();
86 }
87}
88
89impl std::fmt::Debug for CompilerCache {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("CompilerCache")
92 .field("entries", &self.cache.len())
93 .field("max_entries", &self.max_entries)
94 .finish_non_exhaustive()
95 }
96}
97
98fn hash_pipeline(pipeline: &Pipeline) -> u64 {
103 use std::hash::{Hash, Hasher};
104
105 let mut hasher = rustc_hash::FxHasher::default();
106
107 for stage in &pipeline.stages {
108 let stage_repr = format!("{stage:?}");
109 stage_repr.hash(&mut hasher);
110 }
111
112 let in_repr = format!("{:?}", pipeline.input_schema);
115 in_repr.hash(&mut hasher);
116 let out_repr = format!("{:?}", pipeline.output_schema);
117 out_repr.hash(&mut hasher);
118
119 hasher.finish()
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use crate::compiler::pipeline::{PipelineId, PipelineStage};
126 use arrow_schema::{DataType, Field, Schema};
127 use datafusion_expr::{col, lit};
128 use std::sync::Arc;
129
130 fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
131 Arc::new(Schema::new(
132 fields
133 .into_iter()
134 .map(|(name, dt)| Field::new(name, dt, false))
135 .collect::<Vec<_>>(),
136 ))
137 }
138
139 fn simple_filter_pipeline(id: u32) -> Pipeline {
140 let schema = make_schema(vec![("x", DataType::Int64)]);
141 Pipeline {
142 id: PipelineId(id),
143 stages: vec![PipelineStage::Filter {
144 predicate: col("x").gt(lit(10_i64)),
145 }],
146 input_schema: Arc::clone(&schema),
147 output_schema: schema,
148 }
149 }
150
151 #[test]
152 fn cache_miss_then_hit() {
153 let mut cache = CompilerCache::new(10).unwrap();
154 assert!(cache.is_empty());
155
156 let pipeline = simple_filter_pipeline(0);
157 let first = cache.get_or_compile(&pipeline).unwrap();
158 assert_eq!(cache.len(), 1);
159
160 let second = cache.get_or_compile(&pipeline).unwrap();
161 assert_eq!(cache.len(), 1);
162
163 assert!(Arc::ptr_eq(&first, &second));
165 }
166
167 #[test]
168 fn cache_different_pipelines() {
169 let mut cache = CompilerCache::new(10).unwrap();
170
171 let p1 = simple_filter_pipeline(0);
172 let schema = make_schema(vec![("x", DataType::Int64)]);
173 let p2 = Pipeline {
174 id: PipelineId(1),
175 stages: vec![PipelineStage::Filter {
176 predicate: col("x").lt(lit(100_i64)),
177 }],
178 input_schema: Arc::clone(&schema),
179 output_schema: schema,
180 };
181
182 cache.get_or_compile(&p1).unwrap();
183 cache.get_or_compile(&p2).unwrap();
184 assert_eq!(cache.len(), 2);
185 }
186
187 #[test]
188 fn cache_eviction() {
189 let mut cache = CompilerCache::new(2).unwrap();
190
191 let schema = make_schema(vec![("x", DataType::Int64)]);
192 for i in 0..3 {
193 let pipeline = Pipeline {
194 id: PipelineId(i),
195 stages: vec![PipelineStage::Filter {
196 predicate: col("x").gt(lit(i64::from(i))),
197 }],
198 input_schema: Arc::clone(&schema),
199 output_schema: Arc::clone(&schema),
200 };
201 cache.get_or_compile(&pipeline).unwrap();
202 }
203
204 assert_eq!(cache.len(), 2);
206 }
207
208 #[test]
209 fn cache_clear() {
210 let mut cache = CompilerCache::new(10).unwrap();
211 let pipeline = simple_filter_pipeline(0);
212 cache.get_or_compile(&pipeline).unwrap();
213 assert_eq!(cache.len(), 1);
214
215 cache.clear();
216 assert!(cache.is_empty());
217 }
218
219 #[test]
220 fn hash_stability() {
221 let p1 = simple_filter_pipeline(0);
222 let p2 = simple_filter_pipeline(1); let h1 = hash_pipeline(&p1);
225 let h2 = hash_pipeline(&p2);
226
227 assert_eq!(h1, h2);
229 }
230
231 #[test]
232 fn hash_different_for_different_stages() {
233 let schema = make_schema(vec![("x", DataType::Int64)]);
234 let p1 = Pipeline {
235 id: PipelineId(0),
236 stages: vec![PipelineStage::Filter {
237 predicate: col("x").gt(lit(10_i64)),
238 }],
239 input_schema: Arc::clone(&schema),
240 output_schema: Arc::clone(&schema),
241 };
242 let p2 = Pipeline {
243 id: PipelineId(0),
244 stages: vec![PipelineStage::Filter {
245 predicate: col("x").lt(lit(10_i64)),
246 }],
247 input_schema: Arc::clone(&schema),
248 output_schema: schema,
249 };
250
251 assert_ne!(hash_pipeline(&p1), hash_pipeline(&p2));
252 }
253}