1use std::hash::{Hash, Hasher};
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::Instant;
23
24use smallvec::SmallVec;
25
26use super::fallback::ExecutablePipeline;
27use super::metrics::{
28 QueryConfig, QueryError, QueryId, QueryMetadata, QueryMetrics, QueryState, SubmitResult,
29};
30use super::pipeline::PipelineAction;
31use super::pipeline_bridge::{BridgeConsumer, PipelineBridge, Ring1Action};
32use super::row::{EventRow, RowSchema};
33
34pub struct StreamingQueryBuilder {
42 sql: String,
43 pipelines: Vec<ExecutablePipeline>,
44 bridges: Vec<PipelineBridge>,
45 consumers: Vec<BridgeConsumer>,
46 schemas: Vec<Arc<RowSchema>>,
47 config: QueryConfig,
48 metadata: QueryMetadata,
49}
50
51impl StreamingQueryBuilder {
52 #[must_use]
54 pub fn new(sql: impl Into<String>) -> Self {
55 Self {
56 sql: sql.into(),
57 pipelines: Vec::new(),
58 bridges: Vec::new(),
59 consumers: Vec::new(),
60 schemas: Vec::new(),
61 config: QueryConfig::default(),
62 metadata: QueryMetadata::default(),
63 }
64 }
65
66 #[must_use]
68 pub fn add_pipeline(
69 mut self,
70 executable: ExecutablePipeline,
71 bridge: PipelineBridge,
72 consumer: BridgeConsumer,
73 schema: Arc<RowSchema>,
74 ) -> Self {
75 self.pipelines.push(executable);
76 self.bridges.push(bridge);
77 self.consumers.push(consumer);
78 self.schemas.push(schema);
79 self
80 }
81
82 #[must_use]
84 pub fn with_config(mut self, config: QueryConfig) -> Self {
85 self.config = config;
86 self
87 }
88
89 #[must_use]
91 pub fn with_metadata(mut self, metadata: QueryMetadata) -> Self {
92 self.metadata = metadata;
93 self
94 }
95
96 pub fn build(self) -> Result<StreamingQuery, QueryError> {
103 if self.pipelines.is_empty() {
104 return Err(QueryError::NoPipelines);
105 }
106
107 let n = self.pipelines.len();
108 if self.bridges.len() != n || self.consumers.len() != n || self.schemas.len() != n {
109 return Err(QueryError::Build(format!(
110 "mismatched component counts: pipelines={n}, bridges={}, consumers={}, schemas={}",
111 self.bridges.len(),
112 self.consumers.len(),
113 self.schemas.len()
114 )));
115 }
116
117 let id = QueryId({
118 let mut hasher = rustc_hash::FxHasher::default();
119 self.sql.as_bytes().hash(&mut hasher);
120 hasher.finish()
121 });
122
123 let output_buffers = (0..n)
124 .map(|_| vec![0u8; self.config.output_buffer_size])
125 .collect();
126
127 Ok(StreamingQuery {
128 id,
129 sql: self.sql,
130 pipelines: self.pipelines,
131 bridges: self.bridges,
132 consumers: self.consumers,
133 schemas: self.schemas,
134 output_buffers,
135 metadata: self.metadata,
136 state: QueryState::Ready,
137 })
138 }
139}
140
141pub struct StreamingQuery {
149 id: QueryId,
150 sql: String,
151 pipelines: Vec<ExecutablePipeline>,
152 bridges: Vec<PipelineBridge>,
153 consumers: Vec<BridgeConsumer>,
154 schemas: Vec<Arc<RowSchema>>,
155 output_buffers: Vec<Vec<u8>>,
156 metadata: QueryMetadata,
157 state: QueryState,
158}
159
160impl StreamingQuery {
161 pub fn start(&mut self) -> Result<(), QueryError> {
169 if self.state != QueryState::Ready {
170 return Err(QueryError::InvalidState {
171 expected: "Ready",
172 actual: self.state,
173 });
174 }
175 self.state = QueryState::Running;
176 Ok(())
177 }
178
179 pub fn pause(&mut self) -> Result<(), QueryError> {
185 if self.state != QueryState::Running {
186 return Err(QueryError::InvalidState {
187 expected: "Running",
188 actual: self.state,
189 });
190 }
191 self.state = QueryState::Paused;
192 Ok(())
193 }
194
195 pub fn resume(&mut self) -> Result<(), QueryError> {
201 if self.state != QueryState::Paused {
202 return Err(QueryError::InvalidState {
203 expected: "Paused",
204 actual: self.state,
205 });
206 }
207 self.state = QueryState::Running;
208 Ok(())
209 }
210
211 pub fn stop(&mut self) -> Result<(), QueryError> {
217 if self.state == QueryState::Stopped {
218 return Err(QueryError::InvalidState {
219 expected: "Ready|Running|Paused",
220 actual: self.state,
221 });
222 }
223 self.state = QueryState::Stopped;
224 Ok(())
225 }
226
227 pub fn submit_row(
241 &mut self,
242 row: &EventRow<'_>,
243 event_time: i64,
244 key_hash: u64,
245 ) -> Result<SubmitResult, QueryError> {
246 if self.state != QueryState::Running {
247 return Err(QueryError::InvalidState {
248 expected: "Running",
249 actual: self.state,
250 });
251 }
252
253 let mut any_emitted = false;
254
255 for i in 0..self.pipelines.len() {
256 match &self.pipelines[i] {
257 ExecutablePipeline::Compiled(compiled) => {
258 let start = Instant::now();
259 let action = {
260 let output_buf = &mut self.output_buffers[i];
261 unsafe { compiled.execute(row.data().as_ptr(), output_buf.as_mut_ptr()) }
264 };
265 #[allow(clippy::cast_possible_truncation)]
266 let elapsed_ns = start.elapsed().as_nanos() as u64;
267 compiled.stats.record(action, elapsed_ns);
268
269 match action {
270 PipelineAction::Emit => {
271 let output_schema = &compiled.output_schema;
273 let output_row = EventRow::new(
274 &self.output_buffers[i][..output_schema.min_row_size()],
275 output_schema,
276 );
277 self.bridges[i].send_event(&output_row, event_time, key_hash)?;
278 any_emitted = true;
279 }
280 PipelineAction::Drop => {
281 }
283 PipelineAction::Error => {
284 return Err(QueryError::PipelineError { pipeline_idx: i });
285 }
286 }
287 }
288 ExecutablePipeline::Fallback { .. } => {
289 self.bridges[i].send_event(row, event_time, key_hash)?;
291 any_emitted = true;
292 }
293 }
294 }
295
296 if any_emitted {
297 Ok(SubmitResult::Emitted)
298 } else {
299 Ok(SubmitResult::Filtered)
300 }
301 }
302
303 pub fn advance_watermark(&self, timestamp: i64) -> Result<(), QueryError> {
312 if self.state != QueryState::Running {
313 return Err(QueryError::InvalidState {
314 expected: "Running",
315 actual: self.state,
316 });
317 }
318 for bridge in &self.bridges {
319 bridge.send_watermark(timestamp)?;
320 }
321 Ok(())
322 }
323
324 pub fn checkpoint(&self, epoch: u64) -> Result<(), QueryError> {
331 if self.state != QueryState::Running {
332 return Err(QueryError::InvalidState {
333 expected: "Running",
334 actual: self.state,
335 });
336 }
337 for bridge in &self.bridges {
338 bridge.send_checkpoint(epoch)?;
339 }
340 Ok(())
341 }
342
343 pub fn send_eof(&self) -> Result<(), QueryError> {
349 for bridge in &self.bridges {
350 bridge.send_eof()?;
351 }
352 Ok(())
353 }
354
355 pub fn poll_ring1(&mut self) -> SmallVec<[Ring1Action; 4]> {
359 let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
360 for consumer in &mut self.consumers {
361 actions.extend(consumer.drain());
362 }
363 actions
364 }
365
366 pub fn check_latency_flush(&mut self) -> SmallVec<[Ring1Action; 4]> {
368 let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
369 for consumer in &mut self.consumers {
370 if let Some(action) = consumer.check_latency_flush() {
371 actions.push(action);
372 }
373 }
374 actions
375 }
376
377 pub fn swap(&mut self, mut new: StreamingQuery) -> Result<StreamingQuery, QueryError> {
389 if self.state != QueryState::Running && self.state != QueryState::Paused {
390 return Err(QueryError::InvalidState {
391 expected: "Running|Paused",
392 actual: self.state,
393 });
394 }
395 if self.pipelines.len() != new.pipelines.len() {
396 return Err(QueryError::IncompatibleSchemas(format!(
397 "pipeline count mismatch: current={}, new={}",
398 self.pipelines.len(),
399 new.pipelines.len()
400 )));
401 }
402
403 std::mem::swap(&mut self.pipelines, &mut new.pipelines);
405 std::mem::swap(&mut self.bridges, &mut new.bridges);
406 std::mem::swap(&mut self.consumers, &mut new.consumers);
407 std::mem::swap(&mut self.schemas, &mut new.schemas);
408 std::mem::swap(&mut self.output_buffers, &mut new.output_buffers);
409 std::mem::swap(&mut self.metadata, &mut new.metadata);
410 std::mem::swap(&mut self.sql, &mut new.sql);
411 self.id = QueryId({
412 let mut hasher = rustc_hash::FxHasher::default();
413 self.sql.as_bytes().hash(&mut hasher);
414 hasher.finish()
415 });
416 new.state = QueryState::Stopped;
417
418 Ok(new)
419 }
420
421 #[must_use]
425 pub fn id(&self) -> QueryId {
426 self.id
427 }
428
429 #[must_use]
431 pub fn sql(&self) -> &str {
432 &self.sql
433 }
434
435 #[must_use]
437 pub fn state(&self) -> QueryState {
438 self.state
439 }
440
441 #[must_use]
443 pub fn metadata(&self) -> &QueryMetadata {
444 &self.metadata
445 }
446
447 #[must_use]
449 pub fn pipeline_count(&self) -> usize {
450 self.pipelines.len()
451 }
452
453 #[must_use]
455 pub fn metrics(&self) -> QueryMetrics {
456 let mut m = QueryMetrics::default();
457
458 for pipeline in &self.pipelines {
459 match pipeline {
460 ExecutablePipeline::Compiled(compiled) => {
461 m.pipelines_compiled += 1;
462 m.ring0_events_in += compiled.stats.events_processed.load(Ordering::Relaxed);
463 m.ring0_events_out += compiled.stats.events_emitted.load(Ordering::Relaxed);
464 m.ring0_events_dropped += compiled.stats.events_dropped.load(Ordering::Relaxed);
465 m.ring0_total_ns += compiled.stats.total_ns.load(Ordering::Relaxed);
466 }
467 ExecutablePipeline::Fallback { .. } => {
468 m.pipelines_fallback += 1;
469 }
470 }
471 }
472
473 for consumer in &self.consumers {
474 let snap = consumer.stats().snapshot();
475 m.bridge_backpressure_drops += snap.events_dropped;
476 m.bridge_batches_flushed += snap.batches_flushed;
477 m.ring1_rows_flushed += snap.rows_flushed;
478 }
479
480 m
481 }
482}
483
484impl std::fmt::Debug for StreamingQuery {
485 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486 f.debug_struct("StreamingQuery")
487 .field("id", &self.id)
488 .field("state", &self.state)
489 .field("pipeline_count", &self.pipelines.len())
490 .finish_non_exhaustive()
491 }
492}
493
494#[cfg(test)]
497#[allow(clippy::cast_precision_loss)]
498mod tests {
499 use super::*;
500 use crate::compiler::pipeline::{CompiledPipeline, PipelineId};
501 use crate::compiler::pipeline_bridge::create_pipeline_bridge;
502 use crate::compiler::policy::{BackpressureStrategy, BatchPolicy};
503 use crate::compiler::row::MutableEventRow;
504 use arrow_schema::{DataType, Field, Schema};
505 use bumpalo::Bump;
506
507 fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
510 Arc::new(Schema::new(
511 fields
512 .into_iter()
513 .map(|(name, dt)| Field::new(name, dt, false))
514 .collect::<Vec<_>>(),
515 ))
516 }
517
518 fn make_row_schema(fields: Vec<(&str, DataType)>) -> Arc<RowSchema> {
519 let arrow = make_schema(fields);
520 Arc::new(RowSchema::from_arrow(&arrow).unwrap())
521 }
522
523 fn make_fallback(
525 id: u32,
526 row_schema: &Arc<RowSchema>,
527 ) -> (
528 ExecutablePipeline,
529 PipelineBridge,
530 BridgeConsumer,
531 Arc<RowSchema>,
532 ) {
533 let exec = ExecutablePipeline::Fallback {
534 pipeline_id: PipelineId(id),
535 reason: crate::compiler::error::CompileError::UnsupportedExpr(
536 "test fallback".to_string(),
537 ),
538 };
539 let (bridge, consumer) = create_pipeline_bridge(
540 Arc::clone(row_schema),
541 64,
542 1024,
543 BatchPolicy::default(),
544 BackpressureStrategy::DropNewest,
545 )
546 .unwrap();
547 (exec, bridge, consumer, Arc::clone(row_schema))
548 }
549
550 fn make_compiled_emit(
552 id: u32,
553 row_schema: &Arc<RowSchema>,
554 ) -> (
555 ExecutablePipeline,
556 PipelineBridge,
557 BridgeConsumer,
558 Arc<RowSchema>,
559 ) {
560 unsafe extern "C" fn always_emit(input: *const u8, output: *mut u8) -> u8 {
561 std::ptr::copy_nonoverlapping(input, output, 64);
563 1 }
565
566 let compiled = Arc::new(CompiledPipeline::new(
567 PipelineId(id),
568 always_emit,
569 Arc::clone(row_schema),
570 Arc::clone(row_schema),
571 ));
572 let exec = ExecutablePipeline::Compiled(compiled);
573 let (bridge, consumer) = create_pipeline_bridge(
574 Arc::clone(row_schema),
575 64,
576 1024,
577 BatchPolicy::default(),
578 BackpressureStrategy::DropNewest,
579 )
580 .unwrap();
581 (exec, bridge, consumer, Arc::clone(row_schema))
582 }
583
584 fn make_compiled_drop(
586 id: u32,
587 row_schema: &Arc<RowSchema>,
588 ) -> (
589 ExecutablePipeline,
590 PipelineBridge,
591 BridgeConsumer,
592 Arc<RowSchema>,
593 ) {
594 unsafe extern "C" fn always_drop(_: *const u8, _: *mut u8) -> u8 {
595 0 }
597
598 let compiled = Arc::new(CompiledPipeline::new(
599 PipelineId(id),
600 always_drop,
601 Arc::clone(row_schema),
602 Arc::clone(row_schema),
603 ));
604 let exec = ExecutablePipeline::Compiled(compiled);
605 let (bridge, consumer) = create_pipeline_bridge(
606 Arc::clone(row_schema),
607 64,
608 1024,
609 BatchPolicy::default(),
610 BackpressureStrategy::DropNewest,
611 )
612 .unwrap();
613 (exec, bridge, consumer, Arc::clone(row_schema))
614 }
615
616 fn make_event_row<'a>(
617 arena: &'a Bump,
618 schema: &'a RowSchema,
619 ts: i64,
620 val: f64,
621 ) -> EventRow<'a> {
622 let mut row = MutableEventRow::new_in(arena, schema, 0);
623 row.set_i64(0, ts);
624 row.set_f64(1, val);
625 row.freeze()
626 }
627
628 fn default_schema() -> Arc<RowSchema> {
629 make_row_schema(vec![("ts", DataType::Int64), ("val", DataType::Float64)])
630 }
631
632 fn build_query_with_fallback(sql: &str) -> StreamingQuery {
633 let schema = default_schema();
634 let (exec, bridge, consumer, s) = make_fallback(0, &schema);
635 StreamingQueryBuilder::new(sql)
636 .add_pipeline(exec, bridge, consumer, s)
637 .build()
638 .unwrap()
639 }
640
641 fn build_query_with_compiled(sql: &str) -> StreamingQuery {
642 let schema = default_schema();
643 let (exec, bridge, consumer, s) = make_compiled_emit(0, &schema);
644 StreamingQueryBuilder::new(sql)
645 .add_pipeline(exec, bridge, consumer, s)
646 .build()
647 .unwrap()
648 }
649
650 #[test]
653 fn builder_empty_error() {
654 let result = StreamingQueryBuilder::new("SELECT 1").build();
655 assert!(result.is_err());
656 assert!(matches!(result.unwrap_err(), QueryError::NoPipelines));
657 }
658
659 #[test]
660 fn builder_single_fallback() {
661 let query = build_query_with_fallback("SELECT * FROM t");
662 assert_eq!(query.pipeline_count(), 1);
663 assert_eq!(query.state(), QueryState::Ready);
664 }
665
666 #[test]
667 fn builder_single_compiled() {
668 let query = build_query_with_compiled("SELECT * FROM t WHERE val > 10");
669 assert_eq!(query.pipeline_count(), 1);
670 assert_eq!(query.state(), QueryState::Ready);
671 }
672
673 #[test]
674 fn builder_multiple_pipelines() {
675 let schema = default_schema();
676 let (e1, b1, c1, s1) = make_fallback(0, &schema);
677 let (e2, b2, c2, s2) = make_compiled_emit(1, &schema);
678 let query = StreamingQueryBuilder::new("SELECT * FROM t")
679 .add_pipeline(e1, b1, c1, s1)
680 .add_pipeline(e2, b2, c2, s2)
681 .build()
682 .unwrap();
683 assert_eq!(query.pipeline_count(), 2);
684 }
685
686 #[test]
689 fn lifecycle_ready_to_running() {
690 let mut query = build_query_with_fallback("SELECT 1");
691 assert!(query.start().is_ok());
692 assert_eq!(query.state(), QueryState::Running);
693 }
694
695 #[test]
696 fn lifecycle_running_to_paused() {
697 let mut query = build_query_with_fallback("SELECT 1");
698 query.start().unwrap();
699 assert!(query.pause().is_ok());
700 assert_eq!(query.state(), QueryState::Paused);
701 }
702
703 #[test]
704 fn lifecycle_paused_to_running() {
705 let mut query = build_query_with_fallback("SELECT 1");
706 query.start().unwrap();
707 query.pause().unwrap();
708 assert!(query.resume().is_ok());
709 assert_eq!(query.state(), QueryState::Running);
710 }
711
712 #[test]
713 fn lifecycle_running_to_stopped() {
714 let mut query = build_query_with_fallback("SELECT 1");
715 query.start().unwrap();
716 assert!(query.stop().is_ok());
717 assert_eq!(query.state(), QueryState::Stopped);
718 }
719
720 #[test]
721 fn lifecycle_stopped_terminal() {
722 let mut query = build_query_with_fallback("SELECT 1");
723 query.start().unwrap();
724 query.stop().unwrap();
725
726 assert!(query.start().is_err());
728 assert!(query.pause().is_err());
729 assert!(query.resume().is_err());
730 assert!(query.stop().is_err());
731 }
732
733 #[test]
736 fn submit_requires_running() {
737 let schema = default_schema();
738 let mut query = build_query_with_fallback("SELECT 1");
739 let arena = Bump::new();
740 let row = make_event_row(&arena, &schema, 1000, 1.0);
741
742 let result = query.submit_row(&row, 1000, 0);
744 assert!(result.is_err());
745 assert!(matches!(
746 result.unwrap_err(),
747 QueryError::InvalidState { .. }
748 ));
749 }
750
751 #[test]
752 fn submit_fallback_passthrough() {
753 let schema = default_schema();
754 let mut query = build_query_with_fallback("SELECT * FROM t");
755 query.start().unwrap();
756
757 let arena = Bump::new();
758 let row = make_event_row(&arena, &schema, 1000, 42.0);
759 let result = query.submit_row(&row, 1000, 0).unwrap();
760 assert_eq!(result, SubmitResult::Emitted);
761
762 query.advance_watermark(2000).unwrap();
764 let actions = query.poll_ring1();
765 assert!(!actions.is_empty());
766 }
767
768 #[test]
769 fn submit_compiled_emit() {
770 let schema = default_schema();
771 let mut query = build_query_with_compiled("SELECT * FROM t WHERE val > 0");
772 query.start().unwrap();
773
774 let arena = Bump::new();
775 let row = make_event_row(&arena, &schema, 1000, 5.0);
776 let result = query.submit_row(&row, 1000, 0).unwrap();
777 assert_eq!(result, SubmitResult::Emitted);
778 }
779
780 #[test]
781 fn submit_compiled_filter_drop() {
782 let schema = default_schema();
783 let (exec, bridge, consumer, s) = make_compiled_drop(0, &schema);
784 let mut query = StreamingQueryBuilder::new("SELECT * FROM t WHERE val > 100")
785 .add_pipeline(exec, bridge, consumer, s)
786 .build()
787 .unwrap();
788 query.start().unwrap();
789
790 let arena = Bump::new();
791 let row = make_event_row(&arena, &schema, 1000, 5.0);
792 let result = query.submit_row(&row, 1000, 0).unwrap();
793 assert_eq!(result, SubmitResult::Filtered);
794 }
795
796 #[test]
797 fn submit_multiple_pipelines() {
798 let schema = default_schema();
799 let (e1, b1, c1, s1) = make_compiled_emit(0, &schema);
800 let (e2, b2, c2, s2) = make_fallback(1, &schema);
801 let mut query = StreamingQueryBuilder::new("SELECT * FROM t")
802 .add_pipeline(e1, b1, c1, s1)
803 .add_pipeline(e2, b2, c2, s2)
804 .build()
805 .unwrap();
806 query.start().unwrap();
807
808 let arena = Bump::new();
809 let row = make_event_row(&arena, &schema, 1000, 1.0);
810 let result = query.submit_row(&row, 1000, 0).unwrap();
811 assert_eq!(result, SubmitResult::Emitted);
812 }
813
814 #[test]
815 fn submit_advance_watermark() {
816 let mut query = build_query_with_fallback("SELECT 1");
817 query.start().unwrap();
818 assert!(query.advance_watermark(5000).is_ok());
819 }
820
821 #[test]
822 fn submit_checkpoint() {
823 let mut query = build_query_with_fallback("SELECT 1");
824 query.start().unwrap();
825 assert!(query.checkpoint(42).is_ok());
826 }
827
828 #[test]
831 fn poll_empty() {
832 let mut query = build_query_with_fallback("SELECT 1");
833 query.start().unwrap();
834 let actions = query.poll_ring1();
835 assert!(actions.is_empty());
836 }
837
838 #[test]
839 fn poll_with_events() {
840 let schema = default_schema();
841 let mut query = build_query_with_fallback("SELECT * FROM t");
842 query.start().unwrap();
843
844 let arena = Bump::new();
845 let row = make_event_row(&arena, &schema, 1000, 1.0);
846 query.submit_row(&row, 1000, 0).unwrap();
847 query.advance_watermark(2000).unwrap();
848
849 let actions = query.poll_ring1();
850 assert!(!actions.is_empty());
851 assert!(actions.len() >= 2);
853 }
854
855 #[test]
856 fn poll_watermark_flush() {
857 let schema = default_schema();
858 let mut query = build_query_with_fallback("SELECT * FROM t");
859 query.start().unwrap();
860
861 let arena = Bump::new();
862 for i in 0..3 {
863 let row = make_event_row(&arena, &schema, i * 100, i as f64);
864 query.submit_row(&row, i * 100, 0).unwrap();
865 }
866 query.advance_watermark(1000).unwrap();
867
868 let actions = query.poll_ring1();
869 let batch_count = actions
870 .iter()
871 .filter(|a| matches!(a, Ring1Action::ProcessBatch(_)))
872 .count();
873 assert!(batch_count >= 1);
874 }
875
876 #[test]
877 fn poll_multiple_consumers() {
878 let schema = default_schema();
879 let (e1, b1, c1, s1) = make_fallback(0, &schema);
880 let (e2, b2, c2, s2) = make_fallback(1, &schema);
881 let mut query = StreamingQueryBuilder::new("SELECT * FROM t")
882 .add_pipeline(e1, b1, c1, s1)
883 .add_pipeline(e2, b2, c2, s2)
884 .build()
885 .unwrap();
886 query.start().unwrap();
887
888 let arena = Bump::new();
889 let row = make_event_row(&arena, &schema, 1000, 1.0);
890 query.submit_row(&row, 1000, 0).unwrap();
891 query.advance_watermark(2000).unwrap();
892
893 let actions = query.poll_ring1();
894 let watermark_count = actions
896 .iter()
897 .filter(|a| matches!(a, Ring1Action::AdvanceWatermark(_)))
898 .count();
899 assert_eq!(watermark_count, 2);
900 }
901
902 #[test]
905 fn swap_compatible() {
906 let mut query = build_query_with_fallback("SELECT 1 FROM t");
907 query.start().unwrap();
908
909 let new_query = build_query_with_compiled("SELECT 2 FROM t");
910 let old = query.swap(new_query).unwrap();
911
912 assert_eq!(old.state(), QueryState::Stopped);
913 assert_eq!(query.state(), QueryState::Running);
914 }
915
916 #[test]
917 fn swap_incompatible_count() {
918 let mut query = build_query_with_fallback("SELECT 1 FROM t");
919 query.start().unwrap();
920
921 let schema = default_schema();
923 let (e1, b1, c1, s1) = make_fallback(0, &schema);
924 let (e2, b2, c2, s2) = make_fallback(1, &schema);
925 let new_query = StreamingQueryBuilder::new("SELECT 2 FROM t")
926 .add_pipeline(e1, b1, c1, s1)
927 .add_pipeline(e2, b2, c2, s2)
928 .build()
929 .unwrap();
930
931 let result = query.swap(new_query);
932 assert!(result.is_err());
933 assert!(matches!(
934 result.unwrap_err(),
935 QueryError::IncompatibleSchemas(_)
936 ));
937 }
938
939 #[test]
940 fn swap_requires_running_or_paused() {
941 let mut query = build_query_with_fallback("SELECT 1 FROM t");
942 let new_query = build_query_with_fallback("SELECT 2 FROM t");
943
944 let result = query.swap(new_query);
946 assert!(result.is_err());
947 }
948
949 #[test]
952 fn metrics_initial_zero() {
953 let query = build_query_with_compiled("SELECT 1 FROM t");
954 let m = query.metrics();
955 assert_eq!(m.ring0_events_in, 0);
956 assert_eq!(m.ring0_events_out, 0);
957 assert_eq!(m.ring0_events_dropped, 0);
958 assert_eq!(m.pipelines_compiled, 1);
959 assert_eq!(m.pipelines_fallback, 0);
960 }
961
962 #[test]
963 fn metrics_after_submit() {
964 let schema = default_schema();
965 let mut query = build_query_with_compiled("SELECT * FROM t WHERE val > 0");
966 query.start().unwrap();
967
968 let arena = Bump::new();
969 for i in 0..5 {
970 let row = make_event_row(&arena, &schema, i * 100, i as f64);
971 query.submit_row(&row, i * 100, 0).unwrap();
972 }
973
974 let m = query.metrics();
975 assert_eq!(m.ring0_events_in, 5);
976 assert_eq!(m.ring0_events_out, 5);
977 assert_eq!(m.pipelines_compiled, 1);
978 }
979
980 #[test]
983 fn query_id_deterministic() {
984 let q1 = build_query_with_fallback("SELECT * FROM t");
985 let q2 = build_query_with_fallback("SELECT * FROM t");
986 assert_eq!(q1.id(), q2.id());
987
988 let q3 = build_query_with_fallback("SELECT * FROM other");
989 assert_ne!(q1.id(), q3.id());
990 }
991
992 #[test]
993 fn metadata_preserved() {
994 let schema = default_schema();
995 let (exec, bridge, consumer, s) = make_fallback(0, &schema);
996 let meta = QueryMetadata {
997 compiled_pipeline_count: 5,
998 fallback_pipeline_count: 3,
999 jit_enabled: true,
1000 ..Default::default()
1001 };
1002 let query = StreamingQueryBuilder::new("SELECT 1")
1003 .add_pipeline(exec, bridge, consumer, s)
1004 .with_metadata(meta)
1005 .build()
1006 .unwrap();
1007
1008 assert_eq!(query.metadata().compiled_pipeline_count, 5);
1009 assert_eq!(query.metadata().fallback_pipeline_count, 3);
1010 assert!(query.metadata().jit_enabled);
1011 assert_eq!(query.metadata().total_pipelines(), 8);
1012 }
1013}