Skip to main content

laminar_core/compiler/
breaker_executor.rs

1//! Compiled stateful pipeline bridge — wires pipeline breakers to Ring 1 operators.
2//!
3//! When a query plan contains stateful operators (aggregation, sort, join),
4//! the compiler splits execution into multiple compiled segments separated by
5//! breaker boundaries. Each [`BreakerExecutor`] handles the Ring 1 operator
6//! at a pipeline boundary, consuming output from an upstream compiled segment
7//! and producing input for a downstream one.
8//!
9//! # Architecture
10//!
11//! ```text
12//!    Compiled Segment 1        BreakerExecutor        Compiled Segment 2
13//!   (Filter + Project)       (Ring 1 Operator)       (Output Projection)
14//!   ┌──────────────────┐    ┌──────────────────┐    ┌──────────────────┐
15//!   │ fn(in, out) → u8 │──► │  process_batch   │──► │ fn(in, out) → u8 │
16//!   └──────────────────┘    └──────────────────┘    └──────────────────┘
17//!        PipelineBridge          State Store          PipelineBridge
18//! ```
19
20use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow_schema::SchemaRef;
24
25use super::pipeline::PipelineBreaker;
26
27/// A Ring 1 operator that processes batches at a pipeline boundary.
28///
29/// Implementations handle stateful operations like windowed aggregation,
30/// stream joins, and sorting. The operator receives `RecordBatch`es from
31/// the upstream compiled segment and produces output batches for the
32/// downstream segment.
33pub trait Ring1Operator: Send {
34    /// Processes an incoming `RecordBatch` from the upstream compiled segment.
35    ///
36    /// Returns zero or more output batches. For windowed aggregation, output
37    /// may be empty until a window trigger fires.
38    fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch>;
39
40    /// Advances the watermark — may trigger window emissions.
41    ///
42    /// Returns any batches emitted by window closure or timer firing.
43    fn advance_watermark(&mut self, timestamp: i64) -> Vec<RecordBatch>;
44
45    /// Returns the output schema for batches produced by this operator.
46    fn output_schema(&self) -> SchemaRef;
47
48    /// Serializes operator state for checkpointing.
49    fn checkpoint(&self) -> Vec<u8>;
50
51    /// Restores operator state from a checkpoint.
52    fn restore(&mut self, state: &[u8]);
53}
54
55/// Executes a Ring 1 stateful operator at a pipeline boundary.
56///
57/// Wraps a [`Ring1Operator`] with the breaker metadata and tracks
58/// per-executor statistics.
59pub struct BreakerExecutor {
60    /// The type of breaker (aggregate, sort, join).
61    breaker: PipelineBreaker,
62    /// The Ring 1 operator implementation.
63    operator: Box<dyn Ring1Operator>,
64    /// Count of batches processed.
65    batches_in: u64,
66    /// Count of batches emitted.
67    batches_out: u64,
68}
69
70impl BreakerExecutor {
71    /// Creates a new executor for the given breaker and operator.
72    #[must_use]
73    pub fn new(breaker: PipelineBreaker, operator: Box<dyn Ring1Operator>) -> Self {
74        Self {
75            breaker,
76            operator,
77            batches_in: 0,
78            batches_out: 0,
79        }
80    }
81
82    /// Processes a batch from the upstream compiled segment.
83    ///
84    /// Returns any output batches ready for the downstream segment.
85    pub fn process(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
86        self.batches_in += 1;
87        let out = self.operator.process_batch(batch);
88        self.batches_out += out.len() as u64;
89        out
90    }
91
92    /// Advances the watermark through the operator.
93    pub fn advance_watermark(&mut self, timestamp: i64) -> Vec<RecordBatch> {
94        let out = self.operator.advance_watermark(timestamp);
95        self.batches_out += out.len() as u64;
96        out
97    }
98
99    /// Returns the breaker type.
100    #[must_use]
101    pub fn breaker(&self) -> &PipelineBreaker {
102        &self.breaker
103    }
104
105    /// Returns the output schema of the operator.
106    #[must_use]
107    pub fn output_schema(&self) -> SchemaRef {
108        self.operator.output_schema()
109    }
110
111    /// Returns the number of input batches processed.
112    #[must_use]
113    pub fn batches_in(&self) -> u64 {
114        self.batches_in
115    }
116
117    /// Returns the number of output batches emitted.
118    #[must_use]
119    pub fn batches_out(&self) -> u64 {
120        self.batches_out
121    }
122
123    /// Checkpoints the operator state.
124    #[must_use]
125    pub fn checkpoint(&self) -> Vec<u8> {
126        self.operator.checkpoint()
127    }
128
129    /// Restores the operator state.
130    pub fn restore(&mut self, state: &[u8]) {
131        self.operator.restore(state);
132    }
133}
134
135impl std::fmt::Debug for BreakerExecutor {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("BreakerExecutor")
138            .field("breaker", &self.breaker)
139            .field("batches_in", &self.batches_in)
140            .field("batches_out", &self.batches_out)
141            .finish_non_exhaustive()
142    }
143}
144
145/// A graph of compiled segments connected by Ring 1 operators.
146///
147/// Each segment is a compiled pipeline; between segments sit [`BreakerExecutor`]s
148/// that handle stateful operations. The graph executes in topological order:
149/// segment 0 → executor 0 → segment 1 → executor 1 → ...
150///
151/// For the MVP, this is a linear chain (no fan-out or fan-in).
152pub struct CompiledQueryGraph {
153    /// Breaker executors between segments, in execution order.
154    executors: Vec<BreakerExecutor>,
155    /// Input schema for the first segment.
156    input_schema: SchemaRef,
157    /// Output schema of the final segment.
158    output_schema: SchemaRef,
159}
160
161impl CompiledQueryGraph {
162    /// Creates a new graph with the given executors.
163    #[must_use]
164    pub fn new(
165        executors: Vec<BreakerExecutor>,
166        input_schema: SchemaRef,
167        output_schema: SchemaRef,
168    ) -> Self {
169        Self {
170            executors,
171            input_schema,
172            output_schema,
173        }
174    }
175
176    /// Creates an empty graph (no breakers — passthrough).
177    #[must_use]
178    pub fn empty(schema: SchemaRef) -> Self {
179        Self {
180            executors: Vec::new(),
181            input_schema: Arc::clone(&schema),
182            output_schema: schema,
183        }
184    }
185
186    /// Returns the number of breaker executors in the graph.
187    #[must_use]
188    pub fn executor_count(&self) -> usize {
189        self.executors.len()
190    }
191
192    /// Returns a reference to the executors.
193    #[must_use]
194    pub fn executors(&self) -> &[BreakerExecutor] {
195        &self.executors
196    }
197
198    /// Returns a mutable reference to the executors.
199    pub fn executors_mut(&mut self) -> &mut [BreakerExecutor] {
200        &mut self.executors
201    }
202
203    /// Returns the input schema.
204    #[must_use]
205    pub fn input_schema(&self) -> &SchemaRef {
206        &self.input_schema
207    }
208
209    /// Returns the output schema.
210    #[must_use]
211    pub fn output_schema(&self) -> &SchemaRef {
212        &self.output_schema
213    }
214
215    /// Processes a batch through all executors in sequence.
216    ///
217    /// The batch enters the first executor; its output feeds the next, and so on.
218    /// Returns the final output batches after all executors have processed.
219    pub fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
220        if self.executors.is_empty() {
221            return vec![batch];
222        }
223
224        let mut current = vec![batch];
225        for executor in &mut self.executors {
226            let mut next = Vec::new();
227            for b in current {
228                next.extend(executor.process(b));
229            }
230            current = next;
231        }
232        current
233    }
234
235    /// Advances watermark through all executors.
236    pub fn advance_watermark(&mut self, timestamp: i64) -> Vec<RecordBatch> {
237        let mut output = Vec::new();
238        for executor in &mut self.executors {
239            output.extend(executor.advance_watermark(timestamp));
240        }
241        output
242    }
243
244    /// Checkpoints all executor states.
245    #[must_use]
246    pub fn checkpoint(&self) -> Vec<Vec<u8>> {
247        self.executors
248            .iter()
249            .map(BreakerExecutor::checkpoint)
250            .collect()
251    }
252
253    /// Restores all executor states from checkpoint data.
254    ///
255    /// # Panics
256    ///
257    /// Panics if `states.len() != self.executor_count()`.
258    pub fn restore(&mut self, states: &[Vec<u8>]) {
259        assert_eq!(
260            states.len(),
261            self.executors.len(),
262            "state count mismatch: expected {}, got {}",
263            self.executors.len(),
264            states.len()
265        );
266        for (executor, state) in self.executors.iter_mut().zip(states.iter()) {
267            executor.restore(state);
268        }
269    }
270}
271
272impl std::fmt::Debug for CompiledQueryGraph {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        f.debug_struct("CompiledQueryGraph")
275            .field("executors", &self.executors.len())
276            .field("input_schema_fields", &self.input_schema.fields().len())
277            .field("output_schema_fields", &self.output_schema.fields().len())
278            .finish()
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use arrow::array::{Float64Array, Int64Array};
286    use arrow_schema::{DataType, Field, Schema};
287    use datafusion_expr::col;
288
289    /// A passthrough Ring 1 operator that forwards batches unchanged.
290    struct PassthroughOperator {
291        schema: SchemaRef,
292    }
293
294    impl PassthroughOperator {
295        fn new(schema: SchemaRef) -> Self {
296            Self { schema }
297        }
298    }
299
300    impl Ring1Operator for PassthroughOperator {
301        fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
302            vec![batch]
303        }
304
305        fn advance_watermark(&mut self, _timestamp: i64) -> Vec<RecordBatch> {
306            vec![]
307        }
308
309        fn output_schema(&self) -> SchemaRef {
310            Arc::clone(&self.schema)
311        }
312
313        fn checkpoint(&self) -> Vec<u8> {
314            vec![]
315        }
316
317        fn restore(&mut self, _state: &[u8]) {}
318    }
319
320    /// A buffering operator that accumulates batches and emits on watermark.
321    struct BufferingOperator {
322        schema: SchemaRef,
323        buffer: Vec<RecordBatch>,
324    }
325
326    impl BufferingOperator {
327        fn new(schema: SchemaRef) -> Self {
328            Self {
329                schema,
330                buffer: Vec::new(),
331            }
332        }
333    }
334
335    impl Ring1Operator for BufferingOperator {
336        fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
337            self.buffer.push(batch);
338            vec![] // Hold until watermark.
339        }
340
341        fn advance_watermark(&mut self, _timestamp: i64) -> Vec<RecordBatch> {
342            std::mem::take(&mut self.buffer)
343        }
344
345        fn output_schema(&self) -> SchemaRef {
346            Arc::clone(&self.schema)
347        }
348
349        fn checkpoint(&self) -> Vec<u8> {
350            // Simple: just store the count.
351            #[allow(clippy::cast_possible_truncation)]
352            let len = self.buffer.len() as u32;
353            len.to_le_bytes().to_vec()
354        }
355
356        fn restore(&mut self, state: &[u8]) {
357            if state.len() >= 4 {
358                let count = u32::from_le_bytes(state[..4].try_into().unwrap());
359                // Can't restore actual batches — just validate checkpoint works.
360                assert!(count <= 1000, "sanity check");
361            }
362        }
363    }
364
365    fn test_schema() -> SchemaRef {
366        Arc::new(Schema::new(vec![
367            Field::new("x", DataType::Int64, false),
368            Field::new("y", DataType::Float64, false),
369        ]))
370    }
371
372    fn test_batch(schema: &SchemaRef) -> RecordBatch {
373        RecordBatch::try_new(
374            Arc::clone(schema),
375            vec![
376                Arc::new(Int64Array::from(vec![1, 2, 3])),
377                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
378            ],
379        )
380        .unwrap()
381    }
382
383    // ── BreakerExecutor tests ────────────────────────────────────
384
385    #[test]
386    fn executor_passthrough() {
387        let schema = test_schema();
388        let op = Box::new(PassthroughOperator::new(Arc::clone(&schema)));
389        let breaker = PipelineBreaker::Aggregate {
390            group_exprs: vec![],
391            aggr_exprs: vec![],
392        };
393        let mut exec = BreakerExecutor::new(breaker, op);
394
395        let batch = test_batch(&schema);
396        let out = exec.process(batch);
397        assert_eq!(out.len(), 1);
398        assert_eq!(out[0].num_rows(), 3);
399        assert_eq!(exec.batches_in(), 1);
400        assert_eq!(exec.batches_out(), 1);
401    }
402
403    #[test]
404    fn executor_buffering_emits_on_watermark() {
405        let schema = test_schema();
406        let op = Box::new(BufferingOperator::new(Arc::clone(&schema)));
407        let breaker = PipelineBreaker::Aggregate {
408            group_exprs: vec![col("key")],
409            aggr_exprs: vec![],
410        };
411        let mut exec = BreakerExecutor::new(breaker, op);
412
413        // Process batch — buffered, no output.
414        let batch = test_batch(&schema);
415        let out = exec.process(batch);
416        assert!(out.is_empty());
417
418        // Watermark triggers emission.
419        let out = exec.advance_watermark(100);
420        assert_eq!(out.len(), 1);
421        assert_eq!(exec.batches_out(), 1);
422    }
423
424    #[test]
425    fn executor_output_schema() {
426        let schema = test_schema();
427        let op = Box::new(PassthroughOperator::new(Arc::clone(&schema)));
428        let breaker = PipelineBreaker::Sort {
429            order_exprs: vec![],
430        };
431        let exec = BreakerExecutor::new(breaker, op);
432        assert_eq!(exec.output_schema().fields().len(), 2);
433    }
434
435    #[test]
436    fn executor_checkpoint_restore() {
437        let schema = test_schema();
438        let op = Box::new(BufferingOperator::new(Arc::clone(&schema)));
439        let breaker = PipelineBreaker::Aggregate {
440            group_exprs: vec![],
441            aggr_exprs: vec![],
442        };
443        let mut exec = BreakerExecutor::new(breaker, op);
444
445        // Process a batch to populate buffer.
446        let batch = test_batch(&schema);
447        exec.process(batch);
448
449        // Checkpoint.
450        let state = exec.checkpoint();
451        assert!(!state.is_empty());
452
453        // Restore.
454        exec.restore(&state);
455    }
456
457    #[test]
458    fn executor_debug() {
459        let schema = test_schema();
460        let op = Box::new(PassthroughOperator::new(schema));
461        let breaker = PipelineBreaker::Sink;
462        let exec = BreakerExecutor::new(breaker, op);
463        let s = format!("{exec:?}");
464        assert!(s.contains("BreakerExecutor"));
465    }
466
467    // ── CompiledQueryGraph tests ─────────────────────────────────
468
469    #[test]
470    fn empty_graph_passthrough() {
471        let schema = test_schema();
472        let mut graph = CompiledQueryGraph::empty(Arc::clone(&schema));
473
474        assert_eq!(graph.executor_count(), 0);
475
476        let batch = test_batch(&schema);
477        let out = graph.process_batch(batch);
478        assert_eq!(out.len(), 1);
479        assert_eq!(out[0].num_rows(), 3);
480    }
481
482    #[test]
483    fn single_breaker_graph() {
484        let schema = test_schema();
485        let op = Box::new(PassthroughOperator::new(Arc::clone(&schema)));
486        let breaker = PipelineBreaker::Aggregate {
487            group_exprs: vec![],
488            aggr_exprs: vec![],
489        };
490        let exec = BreakerExecutor::new(breaker, op);
491
492        let mut graph =
493            CompiledQueryGraph::new(vec![exec], Arc::clone(&schema), Arc::clone(&schema));
494
495        assert_eq!(graph.executor_count(), 1);
496
497        let batch = test_batch(&schema);
498        let out = graph.process_batch(batch);
499        assert_eq!(out.len(), 1);
500    }
501
502    #[test]
503    fn multiple_breaker_graph() {
504        let schema = test_schema();
505        let executors = vec![
506            BreakerExecutor::new(
507                PipelineBreaker::Aggregate {
508                    group_exprs: vec![],
509                    aggr_exprs: vec![],
510                },
511                Box::new(PassthroughOperator::new(Arc::clone(&schema))),
512            ),
513            BreakerExecutor::new(
514                PipelineBreaker::Sort {
515                    order_exprs: vec![],
516                },
517                Box::new(PassthroughOperator::new(Arc::clone(&schema))),
518            ),
519        ];
520
521        let mut graph =
522            CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
523
524        assert_eq!(graph.executor_count(), 2);
525
526        let batch = test_batch(&schema);
527        let out = graph.process_batch(batch);
528        assert_eq!(out.len(), 1);
529    }
530
531    #[test]
532    fn graph_watermark_propagation() {
533        let schema = test_schema();
534        let executors = vec![BreakerExecutor::new(
535            PipelineBreaker::Aggregate {
536                group_exprs: vec![],
537                aggr_exprs: vec![],
538            },
539            Box::new(BufferingOperator::new(Arc::clone(&schema))),
540        )];
541
542        let mut graph =
543            CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
544
545        // Process — buffered.
546        let batch = test_batch(&schema);
547        let out = graph.process_batch(batch);
548        assert!(out.is_empty());
549
550        // Watermark — emits.
551        let out = graph.advance_watermark(1000);
552        assert_eq!(out.len(), 1);
553    }
554
555    #[test]
556    fn graph_checkpoint_restore() {
557        let schema = test_schema();
558        let executors = vec![
559            BreakerExecutor::new(
560                PipelineBreaker::Aggregate {
561                    group_exprs: vec![],
562                    aggr_exprs: vec![],
563                },
564                Box::new(BufferingOperator::new(Arc::clone(&schema))),
565            ),
566            BreakerExecutor::new(
567                PipelineBreaker::Sort {
568                    order_exprs: vec![],
569                },
570                Box::new(PassthroughOperator::new(Arc::clone(&schema))),
571            ),
572        ];
573
574        let mut graph =
575            CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
576
577        // Process a batch.
578        let batch = test_batch(&schema);
579        graph.process_batch(batch);
580
581        // Checkpoint.
582        let states = graph.checkpoint();
583        assert_eq!(states.len(), 2);
584
585        // Restore.
586        graph.restore(&states);
587    }
588
589    #[test]
590    fn graph_debug() {
591        let schema = test_schema();
592        let graph = CompiledQueryGraph::empty(schema);
593        let s = format!("{graph:?}");
594        assert!(s.contains("CompiledQueryGraph"));
595        assert!(s.contains("executors"));
596    }
597
598    #[test]
599    fn graph_schemas() {
600        let schema = test_schema();
601        let graph = CompiledQueryGraph::empty(Arc::clone(&schema));
602        assert_eq!(graph.input_schema().fields().len(), 2);
603        assert_eq!(graph.output_schema().fields().len(), 2);
604    }
605
606    #[test]
607    fn graph_executors_mut() {
608        let schema = test_schema();
609        let executors = vec![BreakerExecutor::new(
610            PipelineBreaker::Sink,
611            Box::new(PassthroughOperator::new(Arc::clone(&schema))),
612        )];
613
614        let mut graph =
615            CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
616
617        assert_eq!(graph.executors_mut().len(), 1);
618    }
619}