1use std::sync::Arc;
21
22use arrow::array::RecordBatch;
23use arrow_schema::SchemaRef;
24
25use super::pipeline::PipelineBreaker;
26
27pub trait Ring1Operator: Send {
34 fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch>;
39
40 fn advance_watermark(&mut self, timestamp: i64) -> Vec<RecordBatch>;
44
45 fn output_schema(&self) -> SchemaRef;
47
48 fn checkpoint(&self) -> Vec<u8>;
50
51 fn restore(&mut self, state: &[u8]);
53}
54
55pub struct BreakerExecutor {
60 breaker: PipelineBreaker,
62 operator: Box<dyn Ring1Operator>,
64 batches_in: u64,
66 batches_out: u64,
68}
69
70impl BreakerExecutor {
71 #[must_use]
73 pub fn new(breaker: PipelineBreaker, operator: Box<dyn Ring1Operator>) -> Self {
74 Self {
75 breaker,
76 operator,
77 batches_in: 0,
78 batches_out: 0,
79 }
80 }
81
82 pub fn process(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
86 self.batches_in += 1;
87 let out = self.operator.process_batch(batch);
88 self.batches_out += out.len() as u64;
89 out
90 }
91
92 pub fn advance_watermark(&mut self, timestamp: i64) -> Vec<RecordBatch> {
94 let out = self.operator.advance_watermark(timestamp);
95 self.batches_out += out.len() as u64;
96 out
97 }
98
99 #[must_use]
101 pub fn breaker(&self) -> &PipelineBreaker {
102 &self.breaker
103 }
104
105 #[must_use]
107 pub fn output_schema(&self) -> SchemaRef {
108 self.operator.output_schema()
109 }
110
111 #[must_use]
113 pub fn batches_in(&self) -> u64 {
114 self.batches_in
115 }
116
117 #[must_use]
119 pub fn batches_out(&self) -> u64 {
120 self.batches_out
121 }
122
123 #[must_use]
125 pub fn checkpoint(&self) -> Vec<u8> {
126 self.operator.checkpoint()
127 }
128
129 pub fn restore(&mut self, state: &[u8]) {
131 self.operator.restore(state);
132 }
133}
134
135impl std::fmt::Debug for BreakerExecutor {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct("BreakerExecutor")
138 .field("breaker", &self.breaker)
139 .field("batches_in", &self.batches_in)
140 .field("batches_out", &self.batches_out)
141 .finish_non_exhaustive()
142 }
143}
144
145pub struct CompiledQueryGraph {
153 executors: Vec<BreakerExecutor>,
155 input_schema: SchemaRef,
157 output_schema: SchemaRef,
159}
160
161impl CompiledQueryGraph {
162 #[must_use]
164 pub fn new(
165 executors: Vec<BreakerExecutor>,
166 input_schema: SchemaRef,
167 output_schema: SchemaRef,
168 ) -> Self {
169 Self {
170 executors,
171 input_schema,
172 output_schema,
173 }
174 }
175
176 #[must_use]
178 pub fn empty(schema: SchemaRef) -> Self {
179 Self {
180 executors: Vec::new(),
181 input_schema: Arc::clone(&schema),
182 output_schema: schema,
183 }
184 }
185
186 #[must_use]
188 pub fn executor_count(&self) -> usize {
189 self.executors.len()
190 }
191
192 #[must_use]
194 pub fn executors(&self) -> &[BreakerExecutor] {
195 &self.executors
196 }
197
198 pub fn executors_mut(&mut self) -> &mut [BreakerExecutor] {
200 &mut self.executors
201 }
202
203 #[must_use]
205 pub fn input_schema(&self) -> &SchemaRef {
206 &self.input_schema
207 }
208
209 #[must_use]
211 pub fn output_schema(&self) -> &SchemaRef {
212 &self.output_schema
213 }
214
215 pub fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
220 if self.executors.is_empty() {
221 return vec![batch];
222 }
223
224 let mut current = vec![batch];
225 for executor in &mut self.executors {
226 let mut next = Vec::new();
227 for b in current {
228 next.extend(executor.process(b));
229 }
230 current = next;
231 }
232 current
233 }
234
235 pub fn advance_watermark(&mut self, timestamp: i64) -> Vec<RecordBatch> {
237 let mut output = Vec::new();
238 for executor in &mut self.executors {
239 output.extend(executor.advance_watermark(timestamp));
240 }
241 output
242 }
243
244 #[must_use]
246 pub fn checkpoint(&self) -> Vec<Vec<u8>> {
247 self.executors
248 .iter()
249 .map(BreakerExecutor::checkpoint)
250 .collect()
251 }
252
253 pub fn restore(&mut self, states: &[Vec<u8>]) {
259 assert_eq!(
260 states.len(),
261 self.executors.len(),
262 "state count mismatch: expected {}, got {}",
263 self.executors.len(),
264 states.len()
265 );
266 for (executor, state) in self.executors.iter_mut().zip(states.iter()) {
267 executor.restore(state);
268 }
269 }
270}
271
272impl std::fmt::Debug for CompiledQueryGraph {
273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274 f.debug_struct("CompiledQueryGraph")
275 .field("executors", &self.executors.len())
276 .field("input_schema_fields", &self.input_schema.fields().len())
277 .field("output_schema_fields", &self.output_schema.fields().len())
278 .finish()
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use arrow::array::{Float64Array, Int64Array};
286 use arrow_schema::{DataType, Field, Schema};
287 use datafusion_expr::col;
288
289 struct PassthroughOperator {
291 schema: SchemaRef,
292 }
293
294 impl PassthroughOperator {
295 fn new(schema: SchemaRef) -> Self {
296 Self { schema }
297 }
298 }
299
300 impl Ring1Operator for PassthroughOperator {
301 fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
302 vec![batch]
303 }
304
305 fn advance_watermark(&mut self, _timestamp: i64) -> Vec<RecordBatch> {
306 vec![]
307 }
308
309 fn output_schema(&self) -> SchemaRef {
310 Arc::clone(&self.schema)
311 }
312
313 fn checkpoint(&self) -> Vec<u8> {
314 vec![]
315 }
316
317 fn restore(&mut self, _state: &[u8]) {}
318 }
319
320 struct BufferingOperator {
322 schema: SchemaRef,
323 buffer: Vec<RecordBatch>,
324 }
325
326 impl BufferingOperator {
327 fn new(schema: SchemaRef) -> Self {
328 Self {
329 schema,
330 buffer: Vec::new(),
331 }
332 }
333 }
334
335 impl Ring1Operator for BufferingOperator {
336 fn process_batch(&mut self, batch: RecordBatch) -> Vec<RecordBatch> {
337 self.buffer.push(batch);
338 vec![] }
340
341 fn advance_watermark(&mut self, _timestamp: i64) -> Vec<RecordBatch> {
342 std::mem::take(&mut self.buffer)
343 }
344
345 fn output_schema(&self) -> SchemaRef {
346 Arc::clone(&self.schema)
347 }
348
349 fn checkpoint(&self) -> Vec<u8> {
350 #[allow(clippy::cast_possible_truncation)]
352 let len = self.buffer.len() as u32;
353 len.to_le_bytes().to_vec()
354 }
355
356 fn restore(&mut self, state: &[u8]) {
357 if state.len() >= 4 {
358 let count = u32::from_le_bytes(state[..4].try_into().unwrap());
359 assert!(count <= 1000, "sanity check");
361 }
362 }
363 }
364
365 fn test_schema() -> SchemaRef {
366 Arc::new(Schema::new(vec![
367 Field::new("x", DataType::Int64, false),
368 Field::new("y", DataType::Float64, false),
369 ]))
370 }
371
372 fn test_batch(schema: &SchemaRef) -> RecordBatch {
373 RecordBatch::try_new(
374 Arc::clone(schema),
375 vec![
376 Arc::new(Int64Array::from(vec![1, 2, 3])),
377 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
378 ],
379 )
380 .unwrap()
381 }
382
383 #[test]
386 fn executor_passthrough() {
387 let schema = test_schema();
388 let op = Box::new(PassthroughOperator::new(Arc::clone(&schema)));
389 let breaker = PipelineBreaker::Aggregate {
390 group_exprs: vec![],
391 aggr_exprs: vec![],
392 };
393 let mut exec = BreakerExecutor::new(breaker, op);
394
395 let batch = test_batch(&schema);
396 let out = exec.process(batch);
397 assert_eq!(out.len(), 1);
398 assert_eq!(out[0].num_rows(), 3);
399 assert_eq!(exec.batches_in(), 1);
400 assert_eq!(exec.batches_out(), 1);
401 }
402
403 #[test]
404 fn executor_buffering_emits_on_watermark() {
405 let schema = test_schema();
406 let op = Box::new(BufferingOperator::new(Arc::clone(&schema)));
407 let breaker = PipelineBreaker::Aggregate {
408 group_exprs: vec![col("key")],
409 aggr_exprs: vec![],
410 };
411 let mut exec = BreakerExecutor::new(breaker, op);
412
413 let batch = test_batch(&schema);
415 let out = exec.process(batch);
416 assert!(out.is_empty());
417
418 let out = exec.advance_watermark(100);
420 assert_eq!(out.len(), 1);
421 assert_eq!(exec.batches_out(), 1);
422 }
423
424 #[test]
425 fn executor_output_schema() {
426 let schema = test_schema();
427 let op = Box::new(PassthroughOperator::new(Arc::clone(&schema)));
428 let breaker = PipelineBreaker::Sort {
429 order_exprs: vec![],
430 };
431 let exec = BreakerExecutor::new(breaker, op);
432 assert_eq!(exec.output_schema().fields().len(), 2);
433 }
434
435 #[test]
436 fn executor_checkpoint_restore() {
437 let schema = test_schema();
438 let op = Box::new(BufferingOperator::new(Arc::clone(&schema)));
439 let breaker = PipelineBreaker::Aggregate {
440 group_exprs: vec![],
441 aggr_exprs: vec![],
442 };
443 let mut exec = BreakerExecutor::new(breaker, op);
444
445 let batch = test_batch(&schema);
447 exec.process(batch);
448
449 let state = exec.checkpoint();
451 assert!(!state.is_empty());
452
453 exec.restore(&state);
455 }
456
457 #[test]
458 fn executor_debug() {
459 let schema = test_schema();
460 let op = Box::new(PassthroughOperator::new(schema));
461 let breaker = PipelineBreaker::Sink;
462 let exec = BreakerExecutor::new(breaker, op);
463 let s = format!("{exec:?}");
464 assert!(s.contains("BreakerExecutor"));
465 }
466
467 #[test]
470 fn empty_graph_passthrough() {
471 let schema = test_schema();
472 let mut graph = CompiledQueryGraph::empty(Arc::clone(&schema));
473
474 assert_eq!(graph.executor_count(), 0);
475
476 let batch = test_batch(&schema);
477 let out = graph.process_batch(batch);
478 assert_eq!(out.len(), 1);
479 assert_eq!(out[0].num_rows(), 3);
480 }
481
482 #[test]
483 fn single_breaker_graph() {
484 let schema = test_schema();
485 let op = Box::new(PassthroughOperator::new(Arc::clone(&schema)));
486 let breaker = PipelineBreaker::Aggregate {
487 group_exprs: vec![],
488 aggr_exprs: vec![],
489 };
490 let exec = BreakerExecutor::new(breaker, op);
491
492 let mut graph =
493 CompiledQueryGraph::new(vec![exec], Arc::clone(&schema), Arc::clone(&schema));
494
495 assert_eq!(graph.executor_count(), 1);
496
497 let batch = test_batch(&schema);
498 let out = graph.process_batch(batch);
499 assert_eq!(out.len(), 1);
500 }
501
502 #[test]
503 fn multiple_breaker_graph() {
504 let schema = test_schema();
505 let executors = vec![
506 BreakerExecutor::new(
507 PipelineBreaker::Aggregate {
508 group_exprs: vec![],
509 aggr_exprs: vec![],
510 },
511 Box::new(PassthroughOperator::new(Arc::clone(&schema))),
512 ),
513 BreakerExecutor::new(
514 PipelineBreaker::Sort {
515 order_exprs: vec![],
516 },
517 Box::new(PassthroughOperator::new(Arc::clone(&schema))),
518 ),
519 ];
520
521 let mut graph =
522 CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
523
524 assert_eq!(graph.executor_count(), 2);
525
526 let batch = test_batch(&schema);
527 let out = graph.process_batch(batch);
528 assert_eq!(out.len(), 1);
529 }
530
531 #[test]
532 fn graph_watermark_propagation() {
533 let schema = test_schema();
534 let executors = vec![BreakerExecutor::new(
535 PipelineBreaker::Aggregate {
536 group_exprs: vec![],
537 aggr_exprs: vec![],
538 },
539 Box::new(BufferingOperator::new(Arc::clone(&schema))),
540 )];
541
542 let mut graph =
543 CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
544
545 let batch = test_batch(&schema);
547 let out = graph.process_batch(batch);
548 assert!(out.is_empty());
549
550 let out = graph.advance_watermark(1000);
552 assert_eq!(out.len(), 1);
553 }
554
555 #[test]
556 fn graph_checkpoint_restore() {
557 let schema = test_schema();
558 let executors = vec![
559 BreakerExecutor::new(
560 PipelineBreaker::Aggregate {
561 group_exprs: vec![],
562 aggr_exprs: vec![],
563 },
564 Box::new(BufferingOperator::new(Arc::clone(&schema))),
565 ),
566 BreakerExecutor::new(
567 PipelineBreaker::Sort {
568 order_exprs: vec![],
569 },
570 Box::new(PassthroughOperator::new(Arc::clone(&schema))),
571 ),
572 ];
573
574 let mut graph =
575 CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
576
577 let batch = test_batch(&schema);
579 graph.process_batch(batch);
580
581 let states = graph.checkpoint();
583 assert_eq!(states.len(), 2);
584
585 graph.restore(&states);
587 }
588
589 #[test]
590 fn graph_debug() {
591 let schema = test_schema();
592 let graph = CompiledQueryGraph::empty(schema);
593 let s = format!("{graph:?}");
594 assert!(s.contains("CompiledQueryGraph"));
595 assert!(s.contains("executors"));
596 }
597
598 #[test]
599 fn graph_schemas() {
600 let schema = test_schema();
601 let graph = CompiledQueryGraph::empty(Arc::clone(&schema));
602 assert_eq!(graph.input_schema().fields().len(), 2);
603 assert_eq!(graph.output_schema().fields().len(), 2);
604 }
605
606 #[test]
607 fn graph_executors_mut() {
608 let schema = test_schema();
609 let executors = vec![BreakerExecutor::new(
610 PipelineBreaker::Sink,
611 Box::new(PassthroughOperator::new(Arc::clone(&schema))),
612 )];
613
614 let mut graph =
615 CompiledQueryGraph::new(executors, Arc::clone(&schema), Arc::clone(&schema));
616
617 assert_eq!(graph.executors_mut().len(), 1);
618 }
619}