Skip to main content

laminar_core/compiler/
cache.rs

1//! Compiler cache for compiled pipelines.
2//!
3//! [`CompilerCache`] avoids redundant compilation by caching compiled pipelines
4//! keyed by a hash of their stage definitions. It owns a [`JitContext`] and
5//! provides `get_or_compile` semantics.
6
7use std::sync::Arc;
8
9use rustc_hash::FxHashMap;
10
11use super::error::CompileError;
12use super::jit::JitContext;
13use super::pipeline::{CompiledPipeline, Pipeline};
14use super::pipeline_compiler::PipelineCompiler;
15
16/// Cache for compiled pipelines, keyed by a hash of pipeline stages.
17pub struct CompilerCache {
18    cache: FxHashMap<u64, Arc<CompiledPipeline>>,
19    jit: JitContext,
20    max_entries: usize,
21    /// Insertion order for simple FIFO eviction.
22    insertion_order: Vec<u64>,
23}
24
25impl CompilerCache {
26    /// Creates a new compiler cache with the given maximum number of entries.
27    ///
28    /// # Errors
29    ///
30    /// Returns [`CompileError`] if the JIT context cannot be initialized.
31    pub fn new(max_entries: usize) -> Result<Self, CompileError> {
32        Ok(Self {
33            cache: FxHashMap::default(),
34            jit: JitContext::new()?,
35            max_entries,
36            insertion_order: Vec::new(),
37        })
38    }
39
40    /// Returns a cached compiled pipeline, or compiles and caches it.
41    ///
42    /// # Errors
43    ///
44    /// Returns [`CompileError`] if compilation fails.
45    pub fn get_or_compile(
46        &mut self,
47        pipeline: &Pipeline,
48    ) -> Result<Arc<CompiledPipeline>, CompileError> {
49        let hash = hash_pipeline(pipeline);
50
51        if let Some(cached) = self.cache.get(&hash) {
52            return Ok(Arc::clone(cached));
53        }
54
55        // Evict oldest entry if at capacity.
56        if self.cache.len() >= self.max_entries && !self.insertion_order.is_empty() {
57            let oldest_key = self.insertion_order.remove(0);
58            self.cache.remove(&oldest_key);
59        }
60
61        let mut comp = PipelineCompiler::new(&mut self.jit);
62        let result = Arc::new(comp.compile(pipeline)?);
63
64        self.cache.insert(hash, Arc::clone(&result));
65        self.insertion_order.push(hash);
66
67        Ok(result)
68    }
69
70    /// Returns the number of cached entries.
71    #[must_use]
72    pub fn len(&self) -> usize {
73        self.cache.len()
74    }
75
76    /// Returns `true` if the cache is empty.
77    #[must_use]
78    pub fn is_empty(&self) -> bool {
79        self.cache.is_empty()
80    }
81
82    /// Clears all cached entries.
83    pub fn clear(&mut self) {
84        self.cache.clear();
85        self.insertion_order.clear();
86    }
87}
88
89impl std::fmt::Debug for CompilerCache {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("CompilerCache")
92            .field("entries", &self.cache.len())
93            .field("max_entries", &self.max_entries)
94            .finish_non_exhaustive()
95    }
96}
97
98/// Computes a hash of a pipeline's stages for cache keying.
99///
100/// Uses the `Debug` representation of each stage, which includes all expression
101/// details. This is not cryptographically secure but sufficient for caching.
102fn hash_pipeline(pipeline: &Pipeline) -> u64 {
103    use std::hash::{Hash, Hasher};
104
105    let mut hasher = rustc_hash::FxHasher::default();
106
107    for stage in &pipeline.stages {
108        let stage_repr = format!("{stage:?}");
109        stage_repr.hash(&mut hasher);
110    }
111
112    // Include schemas in the hash to distinguish pipelines with same stages
113    // but different input/output types.
114    let in_repr = format!("{:?}", pipeline.input_schema);
115    in_repr.hash(&mut hasher);
116    let out_repr = format!("{:?}", pipeline.output_schema);
117    out_repr.hash(&mut hasher);
118
119    hasher.finish()
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use crate::compiler::pipeline::{PipelineId, PipelineStage};
126    use arrow_schema::{DataType, Field, Schema};
127    use datafusion_expr::{col, lit};
128    use std::sync::Arc;
129
130    fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
131        Arc::new(Schema::new(
132            fields
133                .into_iter()
134                .map(|(name, dt)| Field::new(name, dt, false))
135                .collect::<Vec<_>>(),
136        ))
137    }
138
139    fn simple_filter_pipeline(id: u32) -> Pipeline {
140        let schema = make_schema(vec![("x", DataType::Int64)]);
141        Pipeline {
142            id: PipelineId(id),
143            stages: vec![PipelineStage::Filter {
144                predicate: col("x").gt(lit(10_i64)),
145            }],
146            input_schema: Arc::clone(&schema),
147            output_schema: schema,
148        }
149    }
150
151    #[test]
152    fn cache_miss_then_hit() {
153        let mut cache = CompilerCache::new(10).unwrap();
154        assert!(cache.is_empty());
155
156        let pipeline = simple_filter_pipeline(0);
157        let first = cache.get_or_compile(&pipeline).unwrap();
158        assert_eq!(cache.len(), 1);
159
160        let second = cache.get_or_compile(&pipeline).unwrap();
161        assert_eq!(cache.len(), 1);
162
163        // Should return the same Arc (cache hit).
164        assert!(Arc::ptr_eq(&first, &second));
165    }
166
167    #[test]
168    fn cache_different_pipelines() {
169        let mut cache = CompilerCache::new(10).unwrap();
170
171        let p1 = simple_filter_pipeline(0);
172        let schema = make_schema(vec![("x", DataType::Int64)]);
173        let p2 = Pipeline {
174            id: PipelineId(1),
175            stages: vec![PipelineStage::Filter {
176                predicate: col("x").lt(lit(100_i64)),
177            }],
178            input_schema: Arc::clone(&schema),
179            output_schema: schema,
180        };
181
182        cache.get_or_compile(&p1).unwrap();
183        cache.get_or_compile(&p2).unwrap();
184        assert_eq!(cache.len(), 2);
185    }
186
187    #[test]
188    fn cache_eviction() {
189        let mut cache = CompilerCache::new(2).unwrap();
190
191        let schema = make_schema(vec![("x", DataType::Int64)]);
192        for i in 0..3 {
193            let pipeline = Pipeline {
194                id: PipelineId(i),
195                stages: vec![PipelineStage::Filter {
196                    predicate: col("x").gt(lit(i64::from(i))),
197                }],
198                input_schema: Arc::clone(&schema),
199                output_schema: Arc::clone(&schema),
200            };
201            cache.get_or_compile(&pipeline).unwrap();
202        }
203
204        // Should have evicted the first entry.
205        assert_eq!(cache.len(), 2);
206    }
207
208    #[test]
209    fn cache_clear() {
210        let mut cache = CompilerCache::new(10).unwrap();
211        let pipeline = simple_filter_pipeline(0);
212        cache.get_or_compile(&pipeline).unwrap();
213        assert_eq!(cache.len(), 1);
214
215        cache.clear();
216        assert!(cache.is_empty());
217    }
218
219    #[test]
220    fn hash_stability() {
221        let p1 = simple_filter_pipeline(0);
222        let p2 = simple_filter_pipeline(1); // Same stages, same schema
223
224        let h1 = hash_pipeline(&p1);
225        let h2 = hash_pipeline(&p2);
226
227        // Same stages + schema should produce same hash regardless of pipeline ID.
228        assert_eq!(h1, h2);
229    }
230
231    #[test]
232    fn hash_different_for_different_stages() {
233        let schema = make_schema(vec![("x", DataType::Int64)]);
234        let p1 = Pipeline {
235            id: PipelineId(0),
236            stages: vec![PipelineStage::Filter {
237                predicate: col("x").gt(lit(10_i64)),
238            }],
239            input_schema: Arc::clone(&schema),
240            output_schema: Arc::clone(&schema),
241        };
242        let p2 = Pipeline {
243            id: PipelineId(0),
244            stages: vec![PipelineStage::Filter {
245                predicate: col("x").lt(lit(10_i64)),
246            }],
247            input_schema: Arc::clone(&schema),
248            output_schema: schema,
249        };
250
251        assert_ne!(hash_pipeline(&p1), hash_pipeline(&p2));
252    }
253}