1use std::sync::Arc;
15
16use cranelift_codegen::ir::types::{self as cl_types};
17use cranelift_codegen::ir::{AbiParam, Function, InstBuilder, MemFlags, UserFuncName, Value};
18use cranelift_codegen::Context;
19use cranelift_frontend::FunctionBuilder;
20use cranelift_module::Module;
21
22use super::error::CompileError;
23use super::expr::{compile_expr_inner, CompiledValue};
24use super::jit::JitContext;
25use super::pipeline::{CompiledPipeline, Pipeline, PipelineFn, PipelineStage};
26use super::row::RowSchema;
27
28const PTR_TYPE: cranelift_codegen::ir::Type = cl_types::I64;
30
31pub struct PipelineCompiler<'a> {
33 jit: &'a mut JitContext,
34}
35
36impl<'a> PipelineCompiler<'a> {
37 pub fn new(jit: &'a mut JitContext) -> Self {
39 Self { jit }
40 }
41
42 pub fn compile(&mut self, pipeline: &Pipeline) -> Result<CompiledPipeline, CompileError> {
48 let input_schema = RowSchema::from_arrow(&pipeline.input_schema)
49 .map_err(|e| CompileError::UnsupportedExpr(format!("input schema: {e}")))?;
50 let output_schema = RowSchema::from_arrow(&pipeline.output_schema)
51 .map_err(|e| CompileError::UnsupportedExpr(format!("output schema: {e}")))?;
52
53 let func_ptr = self.compile_function(pipeline, &input_schema, &output_schema)?;
54
55 Ok(CompiledPipeline::new(
56 pipeline.id,
57 func_ptr,
58 Arc::new(input_schema),
59 Arc::new(output_schema),
60 ))
61 }
62
63 fn compile_function(
65 &mut self,
66 pipeline: &Pipeline,
67 input_schema: &RowSchema,
68 output_schema: &RowSchema,
69 ) -> Result<PipelineFn, CompileError> {
70 let func_name = self
71 .jit
72 .next_func_name(&format!("pipeline_{}", pipeline.id.0));
73
74 let mut sig = self.jit.module().make_signature();
75 sig.params.push(AbiParam::new(PTR_TYPE)); sig.params.push(AbiParam::new(PTR_TYPE)); sig.returns.push(AbiParam::new(cl_types::I8)); let func_id = self.jit.module().declare_function(
80 &func_name,
81 cranelift_module::Linkage::Local,
82 &sig,
83 )?;
84
85 let mut func = Function::with_name_signature(UserFuncName::testcase(&func_name), sig);
86
87 {
88 let builder_ctx = self.jit.builder_ctx();
89 let mut builder = FunctionBuilder::new(&mut func, builder_ctx);
90
91 let entry = builder.create_block();
92 builder.append_block_params_for_function_params(entry);
93 builder.switch_to_block(entry);
94 builder.seal_block(entry);
95
96 let input_ptr = builder.block_params(entry)[0];
97 let output_ptr = builder.block_params(entry)[1];
98
99 let drop_block = builder.create_block();
101
102 for stage in &pipeline.stages {
104 match stage {
105 PipelineStage::Filter { predicate } => {
106 emit_filter_stage(
107 &mut builder,
108 input_schema,
109 predicate,
110 input_ptr,
111 drop_block,
112 )?;
113 }
114 PipelineStage::Project { expressions } => {
115 emit_project_stage(
116 &mut builder,
117 input_schema,
118 output_schema,
119 expressions,
120 input_ptr,
121 output_ptr,
122 )?;
123 }
124 PipelineStage::KeyExtract { key_exprs } => {
125 emit_key_extract_stage(
126 &mut builder,
127 input_schema,
128 output_schema,
129 key_exprs,
130 input_ptr,
131 output_ptr,
132 )?;
133 }
134 }
135 }
136
137 let emit_val = builder.ins().iconst(cl_types::I8, 1);
139 builder.ins().return_(&[emit_val]);
140
141 builder.switch_to_block(drop_block);
143 builder.seal_block(drop_block);
144 let drop_val = builder.ins().iconst(cl_types::I8, 0);
145 builder.ins().return_(&[drop_val]);
146
147 builder.finalize();
148 }
149
150 let mut ctx = Context::for_function(func);
151 self.jit
152 .module()
153 .define_function(func_id, &mut ctx)
154 .map_err(|e| CompileError::Cranelift(Box::new(e)))?;
155 self.jit.module().finalize_definitions().unwrap();
156
157 let code_ptr = self.jit.module().get_finalized_function(func_id);
158 Ok(unsafe { std::mem::transmute::<*const u8, PipelineFn>(code_ptr) })
160 }
161}
162
163fn emit_filter_stage(
165 builder: &mut FunctionBuilder,
166 input_schema: &RowSchema,
167 predicate: &datafusion_expr::Expr,
168 input_ptr: Value,
169 drop_block: cranelift_codegen::ir::Block,
170) -> Result<(), CompileError> {
171 let compiled = compile_expr_inner(builder, input_schema, predicate, input_ptr)?;
172
173 let result = if compiled.is_nullable {
175 if let Some(null_flag) = compiled.null_flag {
176 let zero = builder.ins().iconst(cl_types::I8, 0);
177 let is_null = builder.ins().icmp_imm(
178 cranelift_codegen::ir::condcodes::IntCC::NotEqual,
179 null_flag,
180 0,
181 );
182 builder.ins().select(is_null, zero, compiled.value)
183 } else {
184 compiled.value
185 }
186 } else {
187 compiled.value
188 };
189
190 let continue_block = builder.create_block();
192 builder
193 .ins()
194 .brif(result, continue_block, &[], drop_block, &[]);
195 builder.switch_to_block(continue_block);
196 builder.seal_block(continue_block);
197
198 Ok(())
199}
200
201fn emit_project_stage(
203 builder: &mut FunctionBuilder,
204 input_schema: &RowSchema,
205 output_schema: &RowSchema,
206 expressions: &[(datafusion_expr::Expr, String)],
207 input_ptr: Value,
208 output_ptr: Value,
209) -> Result<(), CompileError> {
210 let mem_flags = MemFlags::trusted();
211
212 for (i, (expr, _name)) in expressions.iter().enumerate() {
213 let compiled = compile_expr_inner(builder, input_schema, expr, input_ptr)?;
214
215 let out_layout = output_schema.field(i);
217 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
218 let out_offset = out_layout.offset as i32;
219 builder
220 .ins()
221 .store(mem_flags, compiled.value, output_ptr, out_offset);
222
223 write_null_bit(builder, output_schema, i, output_ptr, &compiled);
225 }
226
227 Ok(())
228}
229
230fn emit_key_extract_stage(
232 builder: &mut FunctionBuilder,
233 input_schema: &RowSchema,
234 output_schema: &RowSchema,
235 key_exprs: &[datafusion_expr::Expr],
236 input_ptr: Value,
237 output_ptr: Value,
238) -> Result<(), CompileError> {
239 let mem_flags = MemFlags::trusted();
240
241 for (i, expr) in key_exprs.iter().enumerate() {
242 if i >= output_schema.field_count() {
243 break;
244 }
245
246 let compiled = compile_expr_inner(builder, input_schema, expr, input_ptr)?;
247
248 let out_layout = output_schema.field(i);
249 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
250 let out_offset = out_layout.offset as i32;
251 builder
252 .ins()
253 .store(mem_flags, compiled.value, output_ptr, out_offset);
254
255 write_null_bit(builder, output_schema, i, output_ptr, &compiled);
256 }
257
258 Ok(())
259}
260
261fn write_null_bit(
263 builder: &mut FunctionBuilder,
264 schema: &RowSchema,
265 field_idx: usize,
266 row_ptr: Value,
267 compiled: &CompiledValue,
268) {
269 let layout = schema.field(field_idx);
270 let null_bit = layout.null_bit;
271 let byte_idx = RowSchema::header_size() + null_bit / 8;
272 let bit_idx = null_bit % 8;
273
274 let mem_flags = MemFlags::trusted();
275 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
276 let byte_offset = byte_idx as i32;
277
278 if compiled.is_nullable {
279 if let Some(null_flag) = compiled.null_flag {
280 let current_byte = builder
282 .ins()
283 .load(cl_types::I8, mem_flags, row_ptr, byte_offset);
284
285 let mask = builder.ins().iconst(cl_types::I8, 1 << bit_idx);
286 let inv_mask = builder.ins().iconst(cl_types::I8, !(1_i64 << bit_idx));
287
288 let is_null = builder.ins().icmp_imm(
290 cranelift_codegen::ir::condcodes::IntCC::NotEqual,
291 null_flag,
292 0,
293 );
294 let set_byte = builder.ins().bor(current_byte, mask);
295 let clear_byte = builder.ins().band(current_byte, inv_mask);
296 let new_byte = builder.ins().select(is_null, set_byte, clear_byte);
297 builder
298 .ins()
299 .store(mem_flags, new_byte, row_ptr, byte_offset);
300 }
301 } else {
302 let current_byte = builder
304 .ins()
305 .load(cl_types::I8, mem_flags, row_ptr, byte_offset);
306 let inv_mask = builder.ins().iconst(cl_types::I8, !(1_i64 << bit_idx));
307 let new_byte = builder.ins().band(current_byte, inv_mask);
308 builder
309 .ins()
310 .store(mem_flags, new_byte, row_ptr, byte_offset);
311 }
312}
313
314#[cfg(test)]
315#[allow(clippy::similar_names, clippy::approx_constant)]
316mod tests {
317 use super::*;
318 use crate::compiler::pipeline::PipelineId;
319 use crate::compiler::row::{MutableEventRow, RowSchema};
320 use arrow_schema::{DataType, Field, Schema};
321 use bumpalo::Bump;
322 use datafusion_expr::{col, lit};
323
324 fn make_schema(fields: Vec<(&str, DataType, bool)>) -> Arc<arrow_schema::Schema> {
325 Arc::new(Schema::new(
326 fields
327 .into_iter()
328 .map(|(name, dt, nullable)| Field::new(name, dt, nullable))
329 .collect::<Vec<_>>(),
330 ))
331 }
332
333 fn make_row_bytes<'a>(
334 arena: &'a Bump,
335 schema: &'a RowSchema,
336 values: &[(usize, i64)],
337 nulls: &[usize],
338 ) -> &'a [u8] {
339 let mut row = MutableEventRow::new_in(arena, schema, 0);
340 for &(idx, val) in values {
341 row.set_i64(idx, val);
342 }
343 for &idx in nulls {
344 row.set_null(idx, true);
345 }
346 row.freeze().data()
347 }
348
349 #[test]
350 fn compile_filter_only_pipeline() {
351 let schema = make_schema(vec![("x", DataType::Int64, false)]);
352 let pipeline = Pipeline {
353 id: PipelineId(0),
354 stages: vec![PipelineStage::Filter {
355 predicate: col("x").gt(lit(10_i64)),
356 }],
357 input_schema: Arc::clone(&schema),
358 output_schema: schema,
359 };
360
361 let mut jit = JitContext::new().unwrap();
362 let mut compiler = PipelineCompiler::new(&mut jit);
363 let compiled = compiler.compile(&pipeline).unwrap();
364
365 let arena = Bump::new();
366 let input_schema = &compiled.input_schema;
367
368 let input = make_row_bytes(&arena, input_schema, &[(0, 20)], &[]);
370 let mut output = vec![0u8; input_schema.min_row_size()];
371 let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
372 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
373
374 let input = make_row_bytes(&arena, input_schema, &[(0, 5)], &[]);
376 let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
377 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Drop);
378 }
379
380 #[test]
381 fn compile_project_only_pipeline() {
382 let input_schema = make_schema(vec![
383 ("a", DataType::Int64, false),
384 ("b", DataType::Int64, false),
385 ]);
386 let output_schema = make_schema(vec![
387 ("a", DataType::Int64, false),
388 ("a_plus_b", DataType::Int64, false),
389 ]);
390
391 let pipeline = Pipeline {
392 id: PipelineId(0),
393 stages: vec![PipelineStage::Project {
394 expressions: vec![
395 (col("a"), "a".to_string()),
396 (col("a") + col("b"), "a_plus_b".to_string()),
397 ],
398 }],
399 input_schema,
400 output_schema: Arc::clone(&output_schema),
401 };
402
403 let mut jit = JitContext::new().unwrap();
404 let mut compiler = PipelineCompiler::new(&mut jit);
405 let compiled = compiler.compile(&pipeline).unwrap();
406
407 let arena = Bump::new();
408 let in_rs = &compiled.input_schema;
409 let out_rs = &compiled.output_schema;
410
411 let input = make_row_bytes(&arena, in_rs, &[(0, 30), (1, 12)], &[]);
412 let mut output_buf = vec![0u8; out_rs.min_row_size()];
413 let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
414 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
415
416 let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
418 assert_eq!(out_row.get_i64(0), 30);
419 assert_eq!(out_row.get_i64(1), 42);
420 }
421
422 #[test]
423 fn compile_filter_then_project_pipeline() {
424 let input_schema = make_schema(vec![
425 ("x", DataType::Int64, false),
426 ("y", DataType::Int64, false),
427 ]);
428 let output_schema = make_schema(vec![
429 ("x", DataType::Int64, false),
430 ("y_doubled", DataType::Int64, false),
431 ]);
432
433 let pipeline = Pipeline {
434 id: PipelineId(0),
435 stages: vec![
436 PipelineStage::Filter {
437 predicate: col("x").gt(lit(0_i64)),
438 },
439 PipelineStage::Project {
440 expressions: vec![
441 (col("x"), "x".to_string()),
442 (col("y") * lit(2_i64), "y_doubled".to_string()),
443 ],
444 },
445 ],
446 input_schema,
447 output_schema: Arc::clone(&output_schema),
448 };
449
450 let mut jit = JitContext::new().unwrap();
451 let mut compiler = PipelineCompiler::new(&mut jit);
452 let compiled = compiler.compile(&pipeline).unwrap();
453
454 let arena = Bump::new();
455 let in_rs = &compiled.input_schema;
456 let out_rs = &compiled.output_schema;
457
458 let input = make_row_bytes(&arena, in_rs, &[(0, 5), (1, 21)], &[]);
460 let mut output_buf = vec![0u8; out_rs.min_row_size()];
461 let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
462 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
463 let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
464 assert_eq!(out_row.get_i64(0), 5);
465 assert_eq!(out_row.get_i64(1), 42);
466
467 let input = make_row_bytes(&arena, in_rs, &[(0, -1), (1, 21)], &[]);
469 let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
470 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Drop);
471 }
472
473 #[test]
474 fn compile_key_extract_pipeline() {
475 let input_schema = make_schema(vec![
476 ("key", DataType::Int64, false),
477 ("val", DataType::Int64, false),
478 ]);
479 let output_schema = make_schema(vec![("key", DataType::Int64, false)]);
480
481 let pipeline = Pipeline {
482 id: PipelineId(0),
483 stages: vec![PipelineStage::KeyExtract {
484 key_exprs: vec![col("key")],
485 }],
486 input_schema,
487 output_schema: Arc::clone(&output_schema),
488 };
489
490 let mut jit = JitContext::new().unwrap();
491 let mut compiler = PipelineCompiler::new(&mut jit);
492 let compiled = compiler.compile(&pipeline).unwrap();
493
494 let arena = Bump::new();
495 let in_rs = &compiled.input_schema;
496 let out_rs = &compiled.output_schema;
497
498 let input = make_row_bytes(&arena, in_rs, &[(0, 42), (1, 99)], &[]);
499 let mut output_buf = vec![0u8; out_rs.min_row_size()];
500 let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
501 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
502
503 let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
504 assert_eq!(out_row.get_i64(0), 42);
505 }
506
507 #[test]
508 fn compile_null_handling_in_filter() {
509 let input_schema = make_schema(vec![("x", DataType::Int64, true)]);
510
511 let pipeline = Pipeline {
512 id: PipelineId(0),
513 stages: vec![PipelineStage::Filter {
514 predicate: col("x").gt(lit(10_i64)),
515 }],
516 input_schema: Arc::clone(&input_schema),
517 output_schema: input_schema,
518 };
519
520 let mut jit = JitContext::new().unwrap();
521 let mut compiler = PipelineCompiler::new(&mut jit);
522 let compiled = compiler.compile(&pipeline).unwrap();
523
524 let arena = Bump::new();
525 let in_rs = &compiled.input_schema;
526
527 let input = make_row_bytes(&arena, in_rs, &[(0, 0)], &[0]);
529 let mut output_buf = vec![0u8; in_rs.min_row_size()];
530 let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
531 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Drop);
532 }
533
534 #[test]
535 fn compile_null_propagation_in_project() {
536 let input_schema = make_schema(vec![
537 ("x", DataType::Int64, true),
538 ("y", DataType::Int64, false),
539 ]);
540 let output_schema = make_schema(vec![("sum", DataType::Int64, true)]);
541
542 let pipeline = Pipeline {
543 id: PipelineId(0),
544 stages: vec![PipelineStage::Project {
545 expressions: vec![(col("x") + col("y"), "sum".to_string())],
546 }],
547 input_schema,
548 output_schema: Arc::clone(&output_schema),
549 };
550
551 let mut jit = JitContext::new().unwrap();
552 let mut compiler = PipelineCompiler::new(&mut jit);
553 let compiled = compiler.compile(&pipeline).unwrap();
554
555 let arena = Bump::new();
556 let in_rs = &compiled.input_schema;
557 let out_rs = &compiled.output_schema;
558
559 let input = make_row_bytes(&arena, in_rs, &[(0, 0), (1, 10)], &[0]);
561 let mut output_buf = vec![0u8; out_rs.min_row_size()];
562 let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
563 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
564
565 let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
566 assert!(out_row.is_null(0));
567 }
568
569 #[test]
570 fn compile_multi_type_project() {
571 let input_schema = make_schema(vec![
572 ("i", DataType::Int64, false),
573 ("f", DataType::Float64, false),
574 ]);
575 let output_schema = make_schema(vec![
576 ("i", DataType::Int64, false),
577 ("f", DataType::Float64, false),
578 ]);
579
580 let pipeline = Pipeline {
581 id: PipelineId(0),
582 stages: vec![PipelineStage::Project {
583 expressions: vec![
584 (col("i") + lit(1_i64), "i".to_string()),
585 (col("f"), "f".to_string()),
586 ],
587 }],
588 input_schema,
589 output_schema: Arc::clone(&output_schema),
590 };
591
592 let mut jit = JitContext::new().unwrap();
593 let mut compiler = PipelineCompiler::new(&mut jit);
594 let compiled = compiler.compile(&pipeline).unwrap();
595
596 let arena = Bump::new();
597 let in_rs = &compiled.input_schema;
598 let out_rs = &compiled.output_schema;
599
600 let mut in_row = MutableEventRow::new_in(&arena, in_rs, 0);
601 in_row.set_i64(0, 41);
602 in_row.set_f64(1, 3.14);
603 let frozen = in_row.freeze();
604
605 let mut output_buf = vec![0u8; out_rs.min_row_size()];
606 let action = unsafe { compiled.execute(frozen.data().as_ptr(), output_buf.as_mut_ptr()) };
607 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
608
609 let out_row = crate::compiler::row::EventRow::new(&output_buf, out_rs);
610 assert_eq!(out_row.get_i64(0), 42);
611 assert!((out_row.get_f64(1) - 3.14).abs() < f64::EPSILON);
612 }
613
614 #[test]
615 fn compile_empty_pipeline_always_emits() {
616 let schema = make_schema(vec![("x", DataType::Int64, false)]);
617 let pipeline = Pipeline {
618 id: PipelineId(0),
619 stages: vec![],
620 input_schema: Arc::clone(&schema),
621 output_schema: schema,
622 };
623
624 let mut jit = JitContext::new().unwrap();
625 let mut compiler = PipelineCompiler::new(&mut jit);
626 let compiled = compiler.compile(&pipeline).unwrap();
627
628 let arena = Bump::new();
629 let in_rs = &compiled.input_schema;
630 let input = make_row_bytes(&arena, in_rs, &[(0, 42)], &[]);
631 let mut output_buf = vec![0u8; in_rs.min_row_size()];
632 let action = unsafe { compiled.execute(input.as_ptr(), output_buf.as_mut_ptr()) };
633 assert_eq!(action, crate::compiler::pipeline::PipelineAction::Emit);
634 }
635
636 #[test]
637 fn compile_multiple_pipelines_same_jit() {
638 let schema = make_schema(vec![("x", DataType::Int64, false)]);
639
640 let p1 = Pipeline {
641 id: PipelineId(0),
642 stages: vec![PipelineStage::Filter {
643 predicate: col("x").gt(lit(10_i64)),
644 }],
645 input_schema: Arc::clone(&schema),
646 output_schema: Arc::clone(&schema),
647 };
648 let p2 = Pipeline {
649 id: PipelineId(1),
650 stages: vec![PipelineStage::Filter {
651 predicate: col("x").lt(lit(100_i64)),
652 }],
653 input_schema: Arc::clone(&schema),
654 output_schema: schema,
655 };
656
657 let mut jit = JitContext::new().unwrap();
658 let mut compiler = PipelineCompiler::new(&mut jit);
659 let c1 = compiler.compile(&p1).unwrap();
660 let c2 = compiler.compile(&p2).unwrap();
661
662 let arena = Bump::new();
663 let in_rs = &c1.input_schema;
664
665 let input = make_row_bytes(&arena, in_rs, &[(0, 50)], &[]);
666 let mut output = vec![0u8; in_rs.min_row_size()];
667
668 let a1 = unsafe { c1.execute(input.as_ptr(), output.as_mut_ptr()) };
669 let a2 = unsafe { c2.execute(input.as_ptr(), output.as_mut_ptr()) };
670 assert_eq!(a1, crate::compiler::pipeline::PipelineAction::Emit);
671 assert_eq!(a2, crate::compiler::pipeline::PipelineAction::Emit);
672 }
673
674 #[test]
675 fn compile_chained_filters() {
676 let schema = make_schema(vec![("x", DataType::Int64, false)]);
677
678 let pipeline = Pipeline {
679 id: PipelineId(0),
680 stages: vec![
681 PipelineStage::Filter {
682 predicate: col("x").gt(lit(0_i64)),
683 },
684 PipelineStage::Filter {
685 predicate: col("x").lt(lit(100_i64)),
686 },
687 ],
688 input_schema: Arc::clone(&schema),
689 output_schema: schema,
690 };
691
692 let mut jit = JitContext::new().unwrap();
693 let mut compiler = PipelineCompiler::new(&mut jit);
694 let compiled = compiler.compile(&pipeline).unwrap();
695
696 let arena = Bump::new();
697 let in_rs = &compiled.input_schema;
698
699 let input = make_row_bytes(&arena, in_rs, &[(0, 50)], &[]);
701 let mut output = vec![0u8; in_rs.min_row_size()];
702 assert_eq!(
703 unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) },
704 crate::compiler::pipeline::PipelineAction::Emit
705 );
706
707 let input = make_row_bytes(&arena, in_rs, &[(0, -5)], &[]);
709 assert_eq!(
710 unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) },
711 crate::compiler::pipeline::PipelineAction::Drop
712 );
713
714 let input = make_row_bytes(&arena, in_rs, &[(0, 200)], &[]);
716 assert_eq!(
717 unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) },
718 crate::compiler::pipeline::PipelineAction::Drop
719 );
720 }
721}