Skip to main content

laminar_core/compiler/
pipeline.rs

1//! Pipeline abstraction for compiled Ring 0 event processing.
2//!
3//! A streaming SQL query decomposes into **pipelines** (compilable chains of
4//! stateless operators) separated by **breakers** (stateful operators like
5//! windows, joins, and aggregations). Each pipeline compiles into a single
6//! native function that processes one event at a time with zero allocations.
7//!
8//! # Types
9//!
10//! - [`Pipeline`]: A chain of [`PipelineStage`]s with input/output schemas.
11//! - [`PipelineBreaker`]: Stateful operators that cannot be fused.
12//! - [`CompiledPipeline`]: A compiled native function with execution stats.
13//! - [`PipelineAction`]: Result of executing a compiled pipeline (drop/emit/error).
14
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17
18use arrow_schema::SchemaRef;
19use datafusion_expr::Expr;
20
21use super::row::RowSchema;
22
23/// Unique identifier for a pipeline segment.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub struct PipelineId(pub u32);
26
27/// A single stage within a compilable pipeline.
28#[derive(Debug, Clone)]
29pub enum PipelineStage {
30    /// Filter rows by a boolean predicate.
31    Filter {
32        /// The predicate expression. Rows where this evaluates to false/null are dropped.
33        predicate: Expr,
34    },
35    /// Project (compute) new columns from expressions.
36    Project {
37        /// Each entry is `(expression, output_column_name)`.
38        expressions: Vec<(Expr, String)>,
39    },
40    /// Extract key columns for downstream grouping or partitioning.
41    KeyExtract {
42        /// Expressions that compute the key fields.
43        key_exprs: Vec<Expr>,
44    },
45}
46
47/// Stateful operators that break pipeline fusion.
48///
49/// A breaker sits between two pipelines: it consumes output from the upstream
50/// pipeline and feeds input to the downstream pipeline.
51#[derive(Debug, Clone)]
52pub enum PipelineBreaker {
53    /// GROUP BY aggregation.
54    Aggregate {
55        /// Group-by key expressions.
56        group_exprs: Vec<Expr>,
57        /// Aggregate function expressions.
58        aggr_exprs: Vec<Expr>,
59    },
60    /// ORDER BY (requires full materialization).
61    Sort {
62        /// Sort key expressions.
63        order_exprs: Vec<Expr>,
64    },
65    /// Stream-stream or lookup join.
66    Join {
67        /// The type of join (inner, left, right, full).
68        join_type: String,
69        /// Left-side join key expressions.
70        left_keys: Vec<Expr>,
71        /// Right-side join key expressions.
72        right_keys: Vec<Expr>,
73    },
74    /// Terminal sink (no downstream pipeline).
75    Sink,
76}
77
78/// A compilable pipeline: a chain of stateless stages with known schemas.
79#[derive(Debug, Clone)]
80pub struct Pipeline {
81    /// Unique pipeline identifier.
82    pub id: PipelineId,
83    /// Ordered list of stages to execute.
84    pub stages: Vec<PipelineStage>,
85    /// Schema of rows entering this pipeline.
86    pub input_schema: SchemaRef,
87    /// Schema of rows leaving this pipeline.
88    pub output_schema: SchemaRef,
89}
90
91/// Result of executing a compiled pipeline on one event row.
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93#[repr(u8)]
94pub enum PipelineAction {
95    /// Row was filtered out (dropped).
96    Drop = 0,
97    /// Row passed all stages and should be emitted.
98    Emit = 1,
99    /// An error occurred during execution.
100    Error = 2,
101}
102
103impl PipelineAction {
104    /// Converts a raw `u8` return value to a [`PipelineAction`].
105    #[must_use]
106    pub fn from_u8(v: u8) -> Self {
107        match v {
108            0 => Self::Drop,
109            1 => Self::Emit,
110            _ => Self::Error,
111        }
112    }
113}
114
115/// Function pointer type for a compiled pipeline.
116///
117/// Signature: `fn(input_row: *const u8, output_row: *mut u8) -> u8`
118///
119/// - `input_row`: pointer to the input [`EventRow`](super::row::EventRow) byte buffer
120/// - `output_row`: pointer to the output row buffer (must be pre-allocated)
121/// - Returns: `0` = Drop, `1` = Emit, `2` = Error
122pub type PipelineFn = unsafe extern "C" fn(*const u8, *mut u8) -> u8;
123
124/// Runtime statistics for a compiled pipeline.
125#[derive(Debug)]
126pub struct PipelineStats {
127    /// Total events processed.
128    pub events_processed: AtomicU64,
129    /// Events that passed all stages (emitted).
130    pub events_emitted: AtomicU64,
131    /// Events filtered out (dropped).
132    pub events_dropped: AtomicU64,
133    /// Cumulative execution time in nanoseconds.
134    pub total_ns: AtomicU64,
135}
136
137impl PipelineStats {
138    /// Creates a new zeroed stats instance.
139    #[must_use]
140    pub fn new() -> Self {
141        Self {
142            events_processed: AtomicU64::new(0),
143            events_emitted: AtomicU64::new(0),
144            events_dropped: AtomicU64::new(0),
145            total_ns: AtomicU64::new(0),
146        }
147    }
148
149    /// Records a single execution result.
150    pub fn record(&self, action: PipelineAction, elapsed_ns: u64) {
151        self.events_processed.fetch_add(1, Ordering::Relaxed);
152        match action {
153            PipelineAction::Emit => {
154                self.events_emitted.fetch_add(1, Ordering::Relaxed);
155            }
156            PipelineAction::Drop => {
157                self.events_dropped.fetch_add(1, Ordering::Relaxed);
158            }
159            PipelineAction::Error => {}
160        }
161        self.total_ns.fetch_add(elapsed_ns, Ordering::Relaxed);
162    }
163}
164
165impl Default for PipelineStats {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171/// A compiled pipeline ready for Ring 0 execution.
172pub struct CompiledPipeline {
173    /// Pipeline identifier.
174    pub id: PipelineId,
175    /// The compiled native function.
176    func: PipelineFn,
177    /// Schema for input rows.
178    pub input_schema: Arc<RowSchema>,
179    /// Schema for output rows.
180    pub output_schema: Arc<RowSchema>,
181    /// Runtime statistics.
182    pub stats: PipelineStats,
183}
184
185impl CompiledPipeline {
186    /// Creates a new compiled pipeline.
187    pub fn new(
188        id: PipelineId,
189        func: PipelineFn,
190        input_schema: Arc<RowSchema>,
191        output_schema: Arc<RowSchema>,
192    ) -> Self {
193        Self {
194            id,
195            func,
196            input_schema,
197            output_schema,
198            stats: PipelineStats::new(),
199        }
200    }
201
202    /// Executes the compiled pipeline on one input row.
203    ///
204    /// # Safety
205    ///
206    /// - `input` must point to a valid `EventRow` byte buffer matching `input_schema`.
207    /// - `output` must point to a buffer of at least `output_schema.min_row_size()` bytes.
208    #[inline]
209    pub unsafe fn execute(&self, input: *const u8, output: *mut u8) -> PipelineAction {
210        let result = (self.func)(input, output);
211        PipelineAction::from_u8(result)
212    }
213
214    /// Returns the compiled function pointer.
215    #[must_use]
216    pub fn func(&self) -> PipelineFn {
217        self.func
218    }
219}
220
221// SAFETY: The function pointer is valid across threads (compiled code is immutable).
222unsafe impl Send for CompiledPipeline {}
223unsafe impl Sync for CompiledPipeline {}
224
225impl std::fmt::Debug for CompiledPipeline {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        f.debug_struct("CompiledPipeline")
228            .field("id", &self.id)
229            .field("func", &"<native fn>")
230            .finish_non_exhaustive()
231    }
232}
233
234#[cfg(test)]
235#[allow(clippy::items_after_statements, clippy::useless_vec)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn pipeline_action_repr() {
241        assert_eq!(PipelineAction::Drop as u8, 0);
242        assert_eq!(PipelineAction::Emit as u8, 1);
243        assert_eq!(PipelineAction::Error as u8, 2);
244    }
245
246    #[test]
247    fn pipeline_action_from_u8() {
248        assert_eq!(PipelineAction::from_u8(0), PipelineAction::Drop);
249        assert_eq!(PipelineAction::from_u8(1), PipelineAction::Emit);
250        assert_eq!(PipelineAction::from_u8(2), PipelineAction::Error);
251        assert_eq!(PipelineAction::from_u8(255), PipelineAction::Error);
252    }
253
254    #[test]
255    fn pipeline_id_equality() {
256        assert_eq!(PipelineId(0), PipelineId(0));
257        assert_ne!(PipelineId(0), PipelineId(1));
258    }
259
260    #[test]
261    fn pipeline_stats_record() {
262        let stats = PipelineStats::new();
263        stats.record(PipelineAction::Emit, 100);
264        stats.record(PipelineAction::Drop, 50);
265        stats.record(PipelineAction::Emit, 80);
266
267        assert_eq!(stats.events_processed.load(Ordering::Relaxed), 3);
268        assert_eq!(stats.events_emitted.load(Ordering::Relaxed), 2);
269        assert_eq!(stats.events_dropped.load(Ordering::Relaxed), 1);
270        assert_eq!(stats.total_ns.load(Ordering::Relaxed), 230);
271    }
272
273    #[test]
274    fn compiled_pipeline_execute() {
275        use arrow_schema::{DataType, Field, Schema};
276
277        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
278        let row_schema = Arc::new(RowSchema::from_arrow(&schema).unwrap());
279
280        // A dummy pipeline function that always emits.
281        unsafe extern "C" fn always_emit(_input: *const u8, _output: *mut u8) -> u8 {
282            1
283        }
284
285        let compiled = CompiledPipeline::new(
286            PipelineId(0),
287            always_emit,
288            Arc::clone(&row_schema),
289            Arc::clone(&row_schema),
290        );
291
292        let input = vec![0u8; row_schema.min_row_size()];
293        let mut output = vec![0u8; row_schema.min_row_size()];
294        let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
295        assert_eq!(action, PipelineAction::Emit);
296    }
297
298    #[test]
299    fn compiled_pipeline_drop_action() {
300        use arrow_schema::{DataType, Field, Schema};
301
302        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
303        let row_schema = Arc::new(RowSchema::from_arrow(&schema).unwrap());
304
305        unsafe extern "C" fn always_drop(_input: *const u8, _output: *mut u8) -> u8 {
306            0
307        }
308
309        let compiled = CompiledPipeline::new(
310            PipelineId(1),
311            always_drop,
312            Arc::clone(&row_schema),
313            row_schema,
314        );
315
316        let input = vec![0u8; 64];
317        let mut output = vec![0u8; 64];
318        let action = unsafe { compiled.execute(input.as_ptr(), output.as_mut_ptr()) };
319        assert_eq!(action, PipelineAction::Drop);
320    }
321
322    #[test]
323    fn pipeline_debug_impl() {
324        use arrow_schema::{DataType, Field, Schema};
325
326        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)]));
327        let row_schema = Arc::new(RowSchema::from_arrow(&schema).unwrap());
328
329        unsafe extern "C" fn noop(_: *const u8, _: *mut u8) -> u8 {
330            0
331        }
332
333        let compiled =
334            CompiledPipeline::new(PipelineId(42), noop, Arc::clone(&row_schema), row_schema);
335        let dbg = format!("{compiled:?}");
336        assert!(dbg.contains("42"));
337        assert!(dbg.contains("native fn"));
338    }
339}