Skip to main content

laminar_core/compiler/
pipeline_compiler.rs

1//! Cranelift code generation for compiled pipelines.
2//!
3//! [`PipelineCompiler`] takes a [`Pipeline`] (a chain of Filter, Project, and
4//! `KeyExtract` stages) and generates a single native function that processes
5//! one input row and writes the result to an output row.
6//!
7//! The generated function has signature:
8//! ```text
9//! fn(input_row: *const u8, output_row: *mut u8) -> u8
10//! ```
11//!
12//! Returns `0` (Drop), `1` (Emit), or `2` (Error).
13
14use std::sync::Arc;
15
16use cranelift_codegen::ir::types::{self as cl_types};
17use cranelift_codegen::ir::{AbiParam, Function, InstBuilder, MemFlags, UserFuncName, Value};
18use cranelift_codegen::Context;
19use cranelift_frontend::FunctionBuilder;
20use cranelift_module::Module;
21
22use super::error::CompileError;
23use super::expr::{compile_expr_inner, CompiledValue};
24use super::jit::JitContext;
25use super::pipeline::{CompiledPipeline, Pipeline, PipelineFn, PipelineStage};
26use super::row::RowSchema;
27
28/// Pointer type for the target architecture.
29const PTR_TYPE: cranelift_codegen::ir::Type = cl_types::I64;
30
31/// Compiles [`Pipeline`]s into native functions via Cranelift.
32pub struct PipelineCompiler<'a> {
33    jit: &'a mut JitContext,
34}
35
36impl<'a> PipelineCompiler<'a> {
37    /// Creates a new pipeline compiler using the given JIT context.
38    pub fn new(jit: &'a mut JitContext) -> Self {
39        Self { jit }
40    }
41
42    /// Compiles a [`Pipeline`] into a native [`CompiledPipeline`].
43    ///
44    /// # Errors
45    ///
46    /// Returns [`CompileError`] if any expression in the pipeline cannot be compiled.
47    pub fn compile(&mut self, pipeline: &Pipeline) -> Result<CompiledPipeline, CompileError> {
48        let input_schema = RowSchema::from_arrow(&pipeline.input_schema)
49            .map_err(|e| CompileError::UnsupportedExpr(format!("input schema: {e}")))?;
50        let output_schema = RowSchema::from_arrow(&pipeline.output_schema)
51            .map_err(|e| CompileError::UnsupportedExpr(format!("output schema: {e}")))?;
52
53        let func_ptr = self.compile_function(pipeline, &input_schema, &output_schema)?;
54
55        Ok(CompiledPipeline::new(
56            pipeline.id,
57            func_ptr,
58            Arc::new(input_schema),
59            Arc::new(output_schema),
60        ))
61    }
62
63    /// Generates the Cranelift function for the pipeline.
64    fn compile_function(
65        &mut self,
66        pipeline: &Pipeline,
67        input_schema: &RowSchema,
68        output_schema: &RowSchema,
69    ) -> Result<PipelineFn, CompileError> {
70        let func_name = self
71            .jit
72            .next_func_name(&format!("pipeline_{}", pipeline.id.0));
73
74        let mut sig = self.jit.module().make_signature();
75        sig.params.push(AbiParam::new(PTR_TYPE)); // input_row
76        sig.params.push(AbiParam::new(PTR_TYPE)); // output_row
77        sig.returns.push(AbiParam::new(cl_types::I8)); // action
78
79        let func_id = self.jit.module().declare_function(
80            &func_name,
81            cranelift_module::Linkage::Local,
82            &sig,
83        )?;
84
85        let mut func = Function::with_name_signature(UserFuncName::testcase(&func_name), sig);
86
87        {
88            let builder_ctx = self.jit.builder_ctx();
89            let mut builder = FunctionBuilder::new(&mut func, builder_ctx);
90
91            let entry = builder.create_block();
92            builder.append_block_params_for_function_params(entry);
93            builder.switch_to_block(entry);
94            builder.seal_block(entry);
95
96            let input_ptr = builder.block_params(entry)[0];
97            let output_ptr = builder.block_params(entry)[1];
98
99            // Create the "drop" exit block.
100            let drop_block = builder.create_block();
101
102            // Process each stage sequentially.
103            for stage in &pipeline.stages {
104                match stage {
105                    PipelineStage::Filter { predicate } => {
106                        emit_filter_stage(
107                            &mut builder,
108                            input_schema,
109                            predicate,
110                            input_ptr,
111                            drop_block,
112                        )?;
113                    }
114                    PipelineStage::Project { expressions } => {
115                        emit_project_stage(
116                            &mut builder,
117                            input_schema,
118                            output_schema,
119                            expressions,
120                            input_ptr,
121                            output_ptr,
122                        )?;
123                    }
124                    PipelineStage::KeyExtract { key_exprs } => {
125                        emit_key_extract_stage(
126                            &mut builder,
127                            input_schema,
128                            output_schema,
129                            key_exprs,
130                            input_ptr,
131                            output_ptr,
132                        )?;
133                    }
134                }
135            }
136
137            // Emit path: return 1.
138            let emit_val = builder.ins().iconst(cl_types::I8, 1);
139            builder.ins().return_(&[emit_val]);
140
141            // Drop path: return 0.
142            builder.switch_to_block(drop_block);
143            builder.seal_block(drop_block);
144            let drop_val = builder.ins().iconst(cl_types::I8, 0);
145            builder.ins().return_(&[drop_val]);
146
147            builder.finalize();
148        }
149
150        let mut ctx = Context::for_function(func);
151        self.jit
152            .module()
153            .define_function(func_id, &mut ctx)
154            .map_err(|e| CompileError::Cranelift(Box::new(e)))?;
155        self.jit.module().finalize_definitions().unwrap();
156
157        let code_ptr = self.jit.module().get_finalized_function(func_id);
158        // SAFETY: The generated function has the declared ABI signature.
159        Ok(unsafe { std::mem::transmute::<*const u8, PipelineFn>(code_ptr) })
160    }
161}
162
163/// Emits a filter stage: evaluate predicate, branch to `drop_block` if false/null.
164fn emit_filter_stage(
165    builder: &mut FunctionBuilder,
166    input_schema: &RowSchema,
167    predicate: &datafusion_expr::Expr,
168    input_ptr: Value,
169    drop_block: cranelift_codegen::ir::Block,
170) -> Result<(), CompileError> {
171    let compiled = compile_expr_inner(builder, input_schema, predicate, input_ptr)?;
172
173    // If the result is nullable, treat null as false (drop).
174    let result = if compiled.is_nullable {
175        if let Some(null_flag) = compiled.null_flag {
176            let zero = builder.ins().iconst(cl_types::I8, 0);
177            let is_null = builder.ins().icmp_imm(
178                cranelift_codegen::ir::condcodes::IntCC::NotEqual,
179                null_flag,
180                0,
181            );
182            builder.ins().select(is_null, zero, compiled.value)
183        } else {
184            compiled.value
185        }
186    } else {
187        compiled.value
188    };
189
190    // Branch: if result == 0, jump to drop_block.
191    let continue_block = builder.create_block();
192    builder
193        .ins()
194        .brif(result, continue_block, &[], drop_block, &[]);
195    builder.switch_to_block(continue_block);
196    builder.seal_block(continue_block);
197
198    Ok(())
199}
200
201/// Emits a projection stage: evaluate each expression and store to output row.
202fn emit_project_stage(
203    builder: &mut FunctionBuilder,
204    input_schema: &RowSchema,
205    output_schema: &RowSchema,
206    expressions: &[(datafusion_expr::Expr, String)],
207    input_ptr: Value,
208    output_ptr: Value,
209) -> Result<(), CompileError> {
210    let mem_flags = MemFlags::trusted();
211
212    for (i, (expr, _name)) in expressions.iter().enumerate() {
213        let compiled = compile_expr_inner(builder, input_schema, expr, input_ptr)?;
214
215        // Write value to the output row at the field's offset.
216        let out_layout = output_schema.field(i);
217        #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
218        let out_offset = out_layout.offset as i32;
219        builder
220            .ins()
221            .store(mem_flags, compiled.value, output_ptr, out_offset);
222
223        // Write null bit in output null bitmap.
224        write_null_bit(builder, output_schema, i, output_ptr, &compiled);
225    }
226
227    Ok(())
228}
229
230/// Emits a key-extract stage: evaluate key expressions and write to output row.
231fn emit_key_extract_stage(
232    builder: &mut FunctionBuilder,
233    input_schema: &RowSchema,
234    output_schema: &RowSchema,
235    key_exprs: &[datafusion_expr::Expr],
236    input_ptr: Value,
237    output_ptr: Value,
238) -> Result<(), CompileError> {
239    let mem_flags = MemFlags::trusted();
240
241    for (i, expr) in key_exprs.iter().enumerate() {
242        if i >= output_schema.field_count() {
243            break;
244        }
245
246        let compiled = compile_expr_inner(builder, input_schema, expr, input_ptr)?;
247
248        let out_layout = output_schema.field(i);
249        #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
250        let out_offset = out_layout.offset as i32;
251        builder
252            .ins()
253            .store(mem_flags, compiled.value, output_ptr, out_offset);
254
255        write_null_bit(builder, output_schema, i, output_ptr, &compiled);
256    }
257
258    Ok(())
259}
260
261/// Writes the null bit for a field in the output row's null bitmap.
262fn write_null_bit(
263    builder: &mut FunctionBuilder,
264    schema: &RowSchema,
265    field_idx: usize,
266    row_ptr: Value,
267    compiled: &CompiledValue,
268) {
269    let layout = schema.field(field_idx);
270    let null_bit = layout.null_bit;
271    let byte_idx = RowSchema::header_size() + null_bit / 8;
272    let bit_idx = null_bit % 8;
273
274    let mem_flags = MemFlags::trusted();
275    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
276    let byte_offset = byte_idx as i32;
277
278    if compiled.is_nullable {
279        if let Some(null_flag) = compiled.null_flag {
280            // Load current byte, set or clear the null bit based on null_flag.
281            let current_byte = builder
282                .ins()
283                .load(cl_types::I8, mem_flags, row_ptr, byte_offset);
284
285            let mask = builder.ins().iconst(cl_types::I8, 1 << bit_idx);
286            let inv_mask = builder.ins().iconst(cl_types::I8, !(1_i64 << bit_idx));
287
288            // If null_flag != 0, set the bit; otherwise clear it.
289            let is_null = builder.ins().icmp_imm(
290                cranelift_codegen::ir::condcodes::IntCC::NotEqual,
291                null_flag,
292                0,
293            );
294            let set_byte = builder.ins().bor(current_byte, mask);
295            let clear_byte = builder.ins().band(current_byte, inv_mask);
296            let new_byte = builder.ins().select(is_null, set_byte, clear_byte);
297            builder
298                .ins()
299                .store(mem_flags, new_byte, row_ptr, byte_offset);
300        }
301    } else {
302        // Non-nullable: ensure the bit is clear (valid).
303        let current_byte = builder
304            .ins()
305            .load(cl_types::I8, mem_flags, row_ptr, byte_offset);
306        let inv_mask = builder.ins().iconst(cl_types::I8, !(1_i64 << bit_idx));
307        let new_byte = builder.ins().band(current_byte, inv_mask);
308        builder
309            .ins()
310            .store(mem_flags, new_byte, row_ptr, byte_offset);
311    }
312}
313
314#[cfg(test)]
315#[allow(clippy::similar_names, clippy::approx_constant)]
316mod tests {
317    use super::*;
318    use crate::compiler::pipeline::PipelineId;
319    use crate::compiler::row::{MutableEventRow, RowSchema};
320    use arrow_schema::{DataType, Field, Schema};
321    use bumpalo::Bump;
322    use datafusion_expr::{col, lit};
323
324    fn make_schema(fields: Vec<(&str, DataType, bool)>) -> Arc<arrow_schema::Schema> {
325        Arc::new(Schema::new(
326            fields
327                .into_iter()
328                .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
329                .collect::<Vec<_>>(),
330        ))
331    }
332
333    fn make_row_bytes<'a>(
334        arena: &'a Bump,
335        schema: &'a RowSchema,
336        values: &[(usize, i64)],
337        nulls: &[usize],
338    ) -> &'a [u8] {
339        let mut row = MutableEventRow::new_in(arena, schema, 0);
340        for &(idx, val) in values {
341            row.set_i64(idx, val);
342        }
343        for &idx in nulls {
344            row.set_null(idx, true);
345        }
346        row.freeze().data()
347    }
348
349    #[test]
350    fn compile_filter_only_pipeline() {
351        let schema = make_schema(vec![("x", DataType::Int64, false)]);
352        let pipeline = Pipeline {
353            id: PipelineId(0),
354            stages: vec![PipelineStage::Filter {
355                predicate: col("x").gt(lit(10_i64)),
356            }],
357            input_schema: Arc::clone(&schema),
358            output_schema: schema,
359        };
360
361        let mut jit = JitContext::new().unwrap();
362        let mut compiler = PipelineCompiler::new(&mut jit);
363        let compiled = compiler.compile(&pipeline).unwrap();
364
365        let arena = Bump::new();
366        let input_schema = &compiled.input_schema;
367
368        // x = 20 → should pass (Emit)
369        let input = make_row_bytes(&arena, input_schema, &[(0, 20)], &[]);
370        let mut output = vec![0u8; input_schema.min_row_size()];
371        let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
372        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
373
374        // x = 5 → should fail (Drop)
375        let input = make_row_bytes(&arena, input_schema, &[(0, 5)], &[]);
376        let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
377        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Drop);
378    }
379
380    #[test]
381    fn compile_project_only_pipeline() {
382        let input_schema = make_schema(vec![
383            ("a", DataType::Int64, false),
384            ("b", DataType::Int64, false),
385        ]);
386        let output_schema = make_schema(vec![
387            ("a", DataType::Int64, false),
388            ("a_plus_b", DataType::Int64, false),
389        ]);
390
391        let pipeline = Pipeline {
392            id: PipelineId(0),
393            stages: vec![PipelineStage::Project {
394                expressions: vec![
395                    (col("a"), "a".to_string()),
396                    (col("a") + col("b"), "a_plus_b".to_string()),
397                ],
398            }],
399            input_schema,
400            output_schema: Arc::clone(&output_schema),
401        };
402
403        let mut jit = JitContext::new().unwrap();
404        let mut compiler = PipelineCompiler::new(&mut jit);
405        let compiled = compiler.compile(&pipeline).unwrap();
406
407        let arena = Bump::new();
408        let in_rs = &compiled.input_schema;
409        let out_rs = &compiled.output_schema;
410
411        let input = make_row_bytes(&arena, in_rs, &[(0, 30), (1, 12)], &[]);
412        let mut output_buf = vec![0u8; out_rs.min_row_size()];
413        let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
414        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
415
416        // Check output field 0 (a = 30) and field 1 (a+b = 42).
417        let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
418        assert_eq!(out_row.get_i64(0), 30);
419        assert_eq!(out_row.get_i64(1), 42);
420    }
421
422    #[test]
423    fn compile_filter_then_project_pipeline() {
424        let input_schema = make_schema(vec![
425            ("x", DataType::Int64, false),
426            ("y", DataType::Int64, false),
427        ]);
428        let output_schema = make_schema(vec![
429            ("x", DataType::Int64, false),
430            ("y_doubled", DataType::Int64, false),
431        ]);
432
433        let pipeline = Pipeline {
434            id: PipelineId(0),
435            stages: vec![
436                PipelineStage::Filter {
437                    predicate: col("x").gt(lit(0_i64)),
438                },
439                PipelineStage::Project {
440                    expressions: vec![
441                        (col("x"), "x".to_string()),
442                        (col("y") * lit(2_i64), "y_doubled".to_string()),
443                    ],
444                },
445            ],
446            input_schema,
447            output_schema: Arc::clone(&output_schema),
448        };
449
450        let mut jit = JitContext::new().unwrap();
451        let mut compiler = PipelineCompiler::new(&mut jit);
452        let compiled = compiler.compile(&pipeline).unwrap();
453
454        let arena = Bump::new();
455        let in_rs = &compiled.input_schema;
456        let out_rs = &compiled.output_schema;
457
458        // x=5, y=21 → passes filter, emits (x=5, y_doubled=42)
459        let input = make_row_bytes(&arena, in_rs, &[(0, 5), (1, 21)], &[]);
460        let mut output_buf = vec![0u8; out_rs.min_row_size()];
461        let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
462        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
463        let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
464        assert_eq!(out_row.get_i64(0), 5);
465        assert_eq!(out_row.get_i64(1), 42);
466
467        // x=-1, y=21 → fails filter (Drop)
468        let input = make_row_bytes(&arena, in_rs, &[(0, -1), (1, 21)], &[]);
469        let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
470        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Drop);
471    }
472
473    #[test]
474    fn compile_key_extract_pipeline() {
475        let input_schema = make_schema(vec![
476            ("key", DataType::Int64, false),
477            ("val", DataType::Int64, false),
478        ]);
479        let output_schema = make_schema(vec![("key", DataType::Int64, false)]);
480
481        let pipeline = Pipeline {
482            id: PipelineId(0),
483            stages: vec![PipelineStage::KeyExtract {
484                key_exprs: vec![col("key")],
485            }],
486            input_schema,
487            output_schema: Arc::clone(&output_schema),
488        };
489
490        let mut jit = JitContext::new().unwrap();
491        let mut compiler = PipelineCompiler::new(&mut jit);
492        let compiled = compiler.compile(&pipeline).unwrap();
493
494        let arena = Bump::new();
495        let in_rs = &compiled.input_schema;
496        let out_rs = &compiled.output_schema;
497
498        let input = make_row_bytes(&arena, in_rs, &[(0, 42), (1, 99)], &[]);
499        let mut output_buf = vec![0u8; out_rs.min_row_size()];
500        let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
501        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
502
503        let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
504        assert_eq!(out_row.get_i64(0), 42);
505    }
506
507    #[test]
508    fn compile_null_handling_in_filter() {
509        let input_schema = make_schema(vec![("x", DataType::Int64, true)]);
510
511        let pipeline = Pipeline {
512            id: PipelineId(0),
513            stages: vec![PipelineStage::Filter {
514                predicate: col("x").gt(lit(10_i64)),
515            }],
516            input_schema: Arc::clone(&input_schema),
517            output_schema: input_schema,
518        };
519
520        let mut jit = JitContext::new().unwrap();
521        let mut compiler = PipelineCompiler::new(&mut jit);
522        let compiled = compiler.compile(&pipeline).unwrap();
523
524        let arena = Bump::new();
525        let in_rs = &compiled.input_schema;
526
527        // null x → should be dropped
528        let input = make_row_bytes(&arena, in_rs, &[(0, 0)], &[0]);
529        let mut output_buf = vec![0u8; in_rs.min_row_size()];
530        let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
531        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Drop);
532    }
533
534    #[test]
535    fn compile_null_propagation_in_project() {
536        let input_schema = make_schema(vec![
537            ("x", DataType::Int64, true),
538            ("y", DataType::Int64, false),
539        ]);
540        let output_schema = make_schema(vec![("sum", DataType::Int64, true)]);
541
542        let pipeline = Pipeline {
543            id: PipelineId(0),
544            stages: vec![PipelineStage::Project {
545                expressions: vec![(col("x") + col("y"), "sum".to_string())],
546            }],
547            input_schema,
548            output_schema: Arc::clone(&output_schema),
549        };
550
551        let mut jit = JitContext::new().unwrap();
552        let mut compiler = PipelineCompiler::new(&mut jit);
553        let compiled = compiler.compile(&pipeline).unwrap();
554
555        let arena = Bump::new();
556        let in_rs = &compiled.input_schema;
557        let out_rs = &compiled.output_schema;
558
559        // x is null → output should be null
560        let input = make_row_bytes(&arena, in_rs, &[(0, 0), (1, 10)], &[0]);
561        let mut output_buf = vec![0u8; out_rs.min_row_size()];
562        let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
563        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
564
565        let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
566        assert!(out_row.is_null(0));
567    }
568
569    #[test]
570    fn compile_multi_type_project() {
571        let input_schema = make_schema(vec![
572            ("i", DataType::Int64, false),
573            ("f", DataType::Float64, false),
574        ]);
575        let output_schema = make_schema(vec![
576            ("i", DataType::Int64, false),
577            ("f", DataType::Float64, false),
578        ]);
579
580        let pipeline = Pipeline {
581            id: PipelineId(0),
582            stages: vec![PipelineStage::Project {
583                expressions: vec![
584                    (col("i") + lit(1_i64), "i".to_string()),
585                    (col("f"), "f".to_string()),
586                ],
587            }],
588            input_schema,
589            output_schema: Arc::clone(&output_schema),
590        };
591
592        let mut jit = JitContext::new().unwrap();
593        let mut compiler = PipelineCompiler::new(&mut jit);
594        let compiled = compiler.compile(&pipeline).unwrap();
595
596        let arena = Bump::new();
597        let in_rs = &compiled.input_schema;
598        let out_rs = &compiled.output_schema;
599
600        let mut in_row = MutableEventRow::new_in(&arena, in_rs, 0);
601        in_row.set_i64(0, 41);
602        in_row.set_f64(1, 3.14);
603        let frozen = in_row.freeze();
604
605        let mut output_buf = vec![0u8; out_rs.min_row_size()];
606        let action = unsafe { compiled.execute(frozen.data().as_ptr(), output_buf.as_mut_ptr()) };
607        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
608
609        let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
610        assert_eq!(out_row.get_i64(0), 42);
611        assert!((out_row.get_f64(1) - 3.14).abs() < f64::EPSILON);
612    }
613
614    #[test]
615    fn compile_empty_pipeline_always_emits() {
616        let schema = make_schema(vec![("x", DataType::Int64, false)]);
617        let pipeline = Pipeline {
618            id: PipelineId(0),
619            stages: vec![],
620            input_schema: Arc::clone(&schema),
621            output_schema: schema,
622        };
623
624        let mut jit = JitContext::new().unwrap();
625        let mut compiler = PipelineCompiler::new(&mut jit);
626        let compiled = compiler.compile(&pipeline).unwrap();
627
628        let arena = Bump::new();
629        let in_rs = &compiled.input_schema;
630        let input = make_row_bytes(&arena, in_rs, &[(0, 42)], &[]);
631        let mut output_buf = vec![0u8; in_rs.min_row_size()];
632        let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
633        assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
634    }
635
636    #[test]
637    fn compile_multiple_pipelines_same_jit() {
638        let schema = make_schema(vec![("x", DataType::Int64, false)]);
639
640        let p1 = Pipeline {
641            id: PipelineId(0),
642            stages: vec![PipelineStage::Filter {
643                predicate: col("x").gt(lit(10_i64)),
644            }],
645            input_schema: Arc::clone(&schema),
646            output_schema: Arc::clone(&schema),
647        };
648        let p2 = Pipeline {
649            id: PipelineId(1),
650            stages: vec![PipelineStage::Filter {
651                predicate: col("x").lt(lit(100_i64)),
652            }],
653            input_schema: Arc::clone(&schema),
654            output_schema: schema,
655        };
656
657        let mut jit = JitContext::new().unwrap();
658        let mut compiler = PipelineCompiler::new(&mut jit);
659        let c1 = compiler.compile(&p1).unwrap();
660        let c2 = compiler.compile(&p2).unwrap();
661
662        let arena = Bump::new();
663        let in_rs = &c1.input_schema;
664
665        let input = make_row_bytes(&arena, in_rs, &[(0, 50)], &[]);
666        let mut output = vec![0u8; in_rs.min_row_size()];
667
668        let a1 = unsafe { c1.execute(input.as_ptr(), output.as_mut_ptr()) };
669        let a2 = unsafe { c2.execute(input.as_ptr(), output.as_mut_ptr()) };
670        assert_eq!(a1, crate::compiler::pipeline::PipelineAction::Emit);
671        assert_eq!(a2, crate::compiler::pipeline::PipelineAction::Emit);
672    }
673
674    #[test]
675    fn compile_chained_filters() {
676        let schema = make_schema(vec![("x", DataType::Int64, false)]);
677
678        let pipeline = Pipeline {
679            id: PipelineId(0),
680            stages: vec![
681                PipelineStage::Filter {
682                    predicate: col("x").gt(lit(0_i64)),
683                },
684                PipelineStage::Filter {
685                    predicate: col("x").lt(lit(100_i64)),
686                },
687            ],
688            input_schema: Arc::clone(&schema),
689            output_schema: schema,
690        };
691
692        let mut jit = JitContext::new().unwrap();
693        let mut compiler = PipelineCompiler::new(&mut jit);
694        let compiled = compiler.compile(&pipeline).unwrap();
695
696        let arena = Bump::new();
697        let in_rs = &compiled.input_schema;
698
699        // x=50 → passes both
700        let input = make_row_bytes(&arena, in_rs, &[(0, 50)], &[]);
701        let mut output = vec![0u8; in_rs.min_row_size()];
702        assert_eq!(
703            unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) },
704            crate::compiler::pipeline::PipelineAction::Emit
705        );
706
707        // x=-5 → fails first filter
708        let input = make_row_bytes(&arena, in_rs, &[(0, -5)], &[]);
709        assert_eq!(
710            unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) },
711            crate::compiler::pipeline::PipelineAction::Drop
712        );
713
714        // x=200 → passes first, fails second
715        let input = make_row_bytes(&arena, in_rs, &[(0, 200)], &[]);
716        assert_eq!(
717            unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) },
718            crate::compiler::pipeline::PipelineAction::Drop
719        );
720    }
721}