Skip to main content

laminar_core/compiler/
query.rs

1//! Streaming query lifecycle management.
2//!
3//! [`StreamingQuery`] is the unified runtime object that wires compiled (or fallback)
4//! pipelines to their Ring 0 / Ring 1 bridges and tracks lifecycle state across the
5//! Ring 0 / Ring 1 boundary.
6//!
7//! # Usage
8//!
9//! ```ignore
10//! let query = StreamingQueryBuilder::new("SELECT ts, val FROM stream WHERE val > 10")
11//!     .add_pipeline(executable, bridge, consumer, schema)
12//!     .build()?;
13//! query.start()?;
14//! query.submit_row(&row, event_time, key_hash)?;
15//! let actions = query.poll_ring1();
16//! query.stop()?;
17//! ```
18
19use std::hash::{Hash, Hasher};
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::Instant;
23
24use smallvec::SmallVec;
25
26use super::fallback::ExecutablePipeline;
27use super::metrics::{
28    QueryConfig, QueryError, QueryId, QueryMetadata, QueryMetrics, QueryState, SubmitResult,
29};
30use super::pipeline::PipelineAction;
31use super::pipeline_bridge::{BridgeConsumer, PipelineBridge, Ring1Action};
32use super::row::{EventRow, RowSchema};
33
34// ────────────────────────────── Builder ──────────────────────────────
35
36/// Builder for constructing a [`StreamingQuery`] from pre-compiled components.
37///
38/// Each pipeline is added as a triplet: `(ExecutablePipeline, PipelineBridge, BridgeConsumer)`
39/// with a corresponding output schema. The builder validates that all parallel vectors
40/// have equal length before producing the query.
41pub struct StreamingQueryBuilder {
42    sql: String,
43    pipelines: Vec<ExecutablePipeline>,
44    bridges: Vec<PipelineBridge>,
45    consumers: Vec<BridgeConsumer>,
46    schemas: Vec<Arc<RowSchema>>,
47    config: QueryConfig,
48    metadata: QueryMetadata,
49}
50
51impl StreamingQueryBuilder {
52    /// Creates a new builder for the given SQL query text.
53    #[must_use]
54    pub fn new(sql: impl Into<String>) -> Self {
55        Self {
56            sql: sql.into(),
57            pipelines: Vec::new(),
58            bridges: Vec::new(),
59            consumers: Vec::new(),
60            schemas: Vec::new(),
61            config: QueryConfig::default(),
62            metadata: QueryMetadata::default(),
63        }
64    }
65
66    /// Adds a pipeline triplet (executable + bridge + consumer) with its output schema.
67    #[must_use]
68    pub fn add_pipeline(
69        mut self,
70        executable: ExecutablePipeline,
71        bridge: PipelineBridge,
72        consumer: BridgeConsumer,
73        schema: Arc<RowSchema>,
74    ) -> Self {
75        self.pipelines.push(executable);
76        self.bridges.push(bridge);
77        self.consumers.push(consumer);
78        self.schemas.push(schema);
79        self
80    }
81
82    /// Sets the query configuration.
83    #[must_use]
84    pub fn with_config(mut self, config: QueryConfig) -> Self {
85        self.config = config;
86        self
87    }
88
89    /// Sets the compilation metadata.
90    #[must_use]
91    pub fn with_metadata(mut self, metadata: QueryMetadata) -> Self {
92        self.metadata = metadata;
93        self
94    }
95
96    /// Builds the [`StreamingQuery`].
97    ///
98    /// # Errors
99    ///
100    /// Returns [`QueryError::NoPipelines`] if no pipelines were added.
101    /// Returns [`QueryError::Build`] if the parallel vectors have mismatched lengths.
102    pub fn build(self) -> Result<StreamingQuery, QueryError> {
103        if self.pipelines.is_empty() {
104            return Err(QueryError::NoPipelines);
105        }
106
107        let n = self.pipelines.len();
108        if self.bridges.len() != n || self.consumers.len() != n || self.schemas.len() != n {
109            return Err(QueryError::Build(format!(
110                "mismatched component counts: pipelines={n}, bridges={}, consumers={}, schemas={}",
111                self.bridges.len(),
112                self.consumers.len(),
113                self.schemas.len()
114            )));
115        }
116
117        let id = QueryId({
118            let mut hasher = rustc_hash::FxHasher::default();
119            self.sql.as_bytes().hash(&mut hasher);
120            hasher.finish()
121        });
122
123        let output_buffers = (0..n)
124            .map(|_| vec![0u8; self.config.output_buffer_size])
125            .collect();
126
127        Ok(StreamingQuery {
128            id,
129            sql: self.sql,
130            pipelines: self.pipelines,
131            bridges: self.bridges,
132            consumers: self.consumers,
133            schemas: self.schemas,
134            output_buffers,
135            metadata: self.metadata,
136            state: QueryState::Ready,
137        })
138    }
139}
140
141// ────────────────────────────── StreamingQuery ───────────────────────
142
143/// A running streaming query that connects compiled pipelines to Ring 1 via bridges.
144///
145/// `StreamingQuery` manages the lifecycle (start/pause/resume/stop), processes
146/// events through compiled or fallback pipelines, and produces [`Ring1Action`]s
147/// for downstream stateful operators.
148pub struct StreamingQuery {
149    id: QueryId,
150    sql: String,
151    pipelines: Vec<ExecutablePipeline>,
152    bridges: Vec<PipelineBridge>,
153    consumers: Vec<BridgeConsumer>,
154    schemas: Vec<Arc<RowSchema>>,
155    output_buffers: Vec<Vec<u8>>,
156    metadata: QueryMetadata,
157    state: QueryState,
158}
159
160impl StreamingQuery {
161    // ── Lifecycle ────────────────────────────────────────────────────
162
163    /// Transitions from [`QueryState::Ready`] to [`QueryState::Running`].
164    ///
165    /// # Errors
166    ///
167    /// Returns [`QueryError::InvalidState`] if not in `Ready` state.
168    pub fn start(&mut self) -> Result<(), QueryError> {
169        if self.state != QueryState::Ready {
170            return Err(QueryError::InvalidState {
171                expected: "Ready",
172                actual: self.state,
173            });
174        }
175        self.state = QueryState::Running;
176        Ok(())
177    }
178
179    /// Transitions from [`QueryState::Running`] to [`QueryState::Paused`].
180    ///
181    /// # Errors
182    ///
183    /// Returns [`QueryError::InvalidState`] if not in `Running` state.
184    pub fn pause(&mut self) -> Result<(), QueryError> {
185        if self.state != QueryState::Running {
186            return Err(QueryError::InvalidState {
187                expected: "Running",
188                actual: self.state,
189            });
190        }
191        self.state = QueryState::Paused;
192        Ok(())
193    }
194
195    /// Transitions from [`QueryState::Paused`] to [`QueryState::Running`].
196    ///
197    /// # Errors
198    ///
199    /// Returns [`QueryError::InvalidState`] if not in `Paused` state.
200    pub fn resume(&mut self) -> Result<(), QueryError> {
201        if self.state != QueryState::Paused {
202            return Err(QueryError::InvalidState {
203                expected: "Paused",
204                actual: self.state,
205            });
206        }
207        self.state = QueryState::Running;
208        Ok(())
209    }
210
211    /// Transitions to [`QueryState::Stopped`] from any non-terminal state.
212    ///
213    /// # Errors
214    ///
215    /// Returns [`QueryError::InvalidState`] if already in `Stopped` state.
216    pub fn stop(&mut self) -> Result<(), QueryError> {
217        if self.state == QueryState::Stopped {
218            return Err(QueryError::InvalidState {
219                expected: "Ready|Running|Paused",
220                actual: self.state,
221            });
222        }
223        self.state = QueryState::Stopped;
224        Ok(())
225    }
226
227    // ── Ring 0 — event submission ───────────────────────────────────
228
229    /// Submits an event row for processing through all pipelines.
230    ///
231    /// For compiled pipelines, the row is executed through the native function and
232    /// the result is sent to the corresponding bridge. For fallback pipelines, the
233    /// input row is sent directly (passthrough) for Ring 1 interpreted execution.
234    ///
235    /// # Errors
236    ///
237    /// Returns [`QueryError::InvalidState`] if not in `Running` state.
238    /// Returns [`QueryError::PipelineError`] if a compiled pipeline returns `Error`.
239    /// Returns [`QueryError::Bridge`] if a bridge send fails.
240    pub fn submit_row(
241        &mut self,
242        row: &EventRow<'_>,
243        event_time: i64,
244        key_hash: u64,
245    ) -> Result<SubmitResult, QueryError> {
246        if self.state != QueryState::Running {
247            return Err(QueryError::InvalidState {
248                expected: "Running",
249                actual: self.state,
250            });
251        }
252
253        let mut any_emitted = false;
254
255        for i in 0..self.pipelines.len() {
256            match &self.pipelines[i] {
257                ExecutablePipeline::Compiled(compiled) => {
258                    let start = Instant::now();
259                    let action = {
260                        let output_buf = &mut self.output_buffers[i];
261                        // SAFETY: input row data matches the compiled pipeline's input schema.
262                        // Output buffer is pre-allocated to output_buffer_size bytes.
263                        unsafe { compiled.execute(row.data().as_ptr(), output_buf.as_mut_ptr()) }
264                    };
265                    #[allow(clippy::cast_possible_truncation)]
266                    let elapsed_ns = start.elapsed().as_nanos() as u64;
267                    compiled.stats.record(action, elapsed_ns);
268
269                    match action {
270                        PipelineAction::Emit => {
271                            // Send the output row through the bridge.
272                            let output_schema = &compiled.output_schema;
273                            let output_row = EventRow::new(
274                                &self.output_buffers[i][..output_schema.min_row_size()],
275                                output_schema,
276                            );
277                            self.bridges[i].send_event(&output_row, event_time, key_hash)?;
278                            any_emitted = true;
279                        }
280                        PipelineAction::Drop => {
281                            // Filtered out — nothing to send.
282                        }
283                        PipelineAction::Error => {
284                            return Err(QueryError::PipelineError { pipeline_idx: i });
285                        }
286                    }
287                }
288                ExecutablePipeline::Fallback { .. } => {
289                    // Passthrough: send the input row directly for Ring 1 processing.
290                    self.bridges[i].send_event(row, event_time, key_hash)?;
291                    any_emitted = true;
292                }
293            }
294        }
295
296        if any_emitted {
297            Ok(SubmitResult::Emitted)
298        } else {
299            Ok(SubmitResult::Filtered)
300        }
301    }
302
303    // ── Ring 0 — control messages ───────────────────────────────────
304
305    /// Sends a watermark advance through all bridges.
306    ///
307    /// # Errors
308    ///
309    /// Returns [`QueryError::InvalidState`] if not in `Running` state.
310    /// Returns [`QueryError::Bridge`] if any bridge send fails.
311    pub fn advance_watermark(&self, timestamp: i64) -> Result<(), QueryError> {
312        if self.state != QueryState::Running {
313            return Err(QueryError::InvalidState {
314                expected: "Running",
315                actual: self.state,
316            });
317        }
318        for bridge in &self.bridges {
319            bridge.send_watermark(timestamp)?;
320        }
321        Ok(())
322    }
323
324    /// Sends a checkpoint barrier through all bridges.
325    ///
326    /// # Errors
327    ///
328    /// Returns [`QueryError::InvalidState`] if not in `Running` state.
329    /// Returns [`QueryError::Bridge`] if any bridge send fails.
330    pub fn checkpoint(&self, epoch: u64) -> Result<(), QueryError> {
331        if self.state != QueryState::Running {
332            return Err(QueryError::InvalidState {
333                expected: "Running",
334                actual: self.state,
335            });
336        }
337        for bridge in &self.bridges {
338            bridge.send_checkpoint(epoch)?;
339        }
340        Ok(())
341    }
342
343    /// Sends an end-of-stream marker through all bridges.
344    ///
345    /// # Errors
346    ///
347    /// Returns [`QueryError::Bridge`] if any bridge send fails.
348    pub fn send_eof(&self) -> Result<(), QueryError> {
349        for bridge in &self.bridges {
350            bridge.send_eof()?;
351        }
352        Ok(())
353    }
354
355    // ── Ring 1 — output polling ─────────────────────────────────────
356
357    /// Drains all consumers and returns concatenated Ring 1 actions.
358    pub fn poll_ring1(&mut self) -> SmallVec<[Ring1Action; 4]> {
359        let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
360        for consumer in &mut self.consumers {
361            actions.extend(consumer.drain());
362        }
363        actions
364    }
365
366    /// Checks all consumers for latency-triggered flushes.
367    pub fn check_latency_flush(&mut self) -> SmallVec<[Ring1Action; 4]> {
368        let mut actions: SmallVec<[Ring1Action; 4]> = SmallVec::new();
369        for consumer in &mut self.consumers {
370            if let Some(action) = consumer.check_latency_flush() {
371                actions.push(action);
372            }
373        }
374        actions
375    }
376
377    // ── Hot-swap ────────────────────────────────────────────────────
378
379    /// Swaps this query's pipelines, bridges, and consumers with those from `new`.
380    ///
381    /// Both queries must have the same number of pipelines. The new query
382    /// inherits the current `Running` or `Paused` state.
383    ///
384    /// # Errors
385    ///
386    /// Returns [`QueryError::InvalidState`] if in `Ready` or `Stopped` state.
387    /// Returns [`QueryError::IncompatibleSchemas`] if pipeline counts differ.
388    pub fn swap(&mut self, mut new: StreamingQuery) -> Result<StreamingQuery, QueryError> {
389        if self.state != QueryState::Running && self.state != QueryState::Paused {
390            return Err(QueryError::InvalidState {
391                expected: "Running|Paused",
392                actual: self.state,
393            });
394        }
395        if self.pipelines.len() != new.pipelines.len() {
396            return Err(QueryError::IncompatibleSchemas(format!(
397                "pipeline count mismatch: current={}, new={}",
398                self.pipelines.len(),
399                new.pipelines.len()
400            )));
401        }
402
403        // Swap internals, new query gets old components.
404        std::mem::swap(&mut self.pipelines, &mut new.pipelines);
405        std::mem::swap(&mut self.bridges, &mut new.bridges);
406        std::mem::swap(&mut self.consumers, &mut new.consumers);
407        std::mem::swap(&mut self.schemas, &mut new.schemas);
408        std::mem::swap(&mut self.output_buffers, &mut new.output_buffers);
409        std::mem::swap(&mut self.metadata, &mut new.metadata);
410        std::mem::swap(&mut self.sql, &mut new.sql);
411        self.id = QueryId({
412            let mut hasher = rustc_hash::FxHasher::default();
413            self.sql.as_bytes().hash(&mut hasher);
414            hasher.finish()
415        });
416        new.state = QueryState::Stopped;
417
418        Ok(new)
419    }
420
421    // ── Accessors ───────────────────────────────────────────────────
422
423    /// Returns the query identifier.
424    #[must_use]
425    pub fn id(&self) -> QueryId {
426        self.id
427    }
428
429    /// Returns the SQL text for this query.
430    #[must_use]
431    pub fn sql(&self) -> &str {
432        &self.sql
433    }
434
435    /// Returns the current lifecycle state.
436    #[must_use]
437    pub fn state(&self) -> QueryState {
438        self.state
439    }
440
441    /// Returns compilation metadata.
442    #[must_use]
443    pub fn metadata(&self) -> &QueryMetadata {
444        &self.metadata
445    }
446
447    /// Returns the number of pipelines in this query.
448    #[must_use]
449    pub fn pipeline_count(&self) -> usize {
450        self.pipelines.len()
451    }
452
453    /// Aggregates runtime metrics from all pipelines and bridges.
454    #[must_use]
455    pub fn metrics(&self) -> QueryMetrics {
456        let mut m = QueryMetrics::default();
457
458        for pipeline in &self.pipelines {
459            match pipeline {
460                ExecutablePipeline::Compiled(compiled) => {
461                    m.pipelines_compiled += 1;
462                    m.ring0_events_in += compiled.stats.events_processed.load(Ordering::Relaxed);
463                    m.ring0_events_out += compiled.stats.events_emitted.load(Ordering::Relaxed);
464                    m.ring0_events_dropped += compiled.stats.events_dropped.load(Ordering::Relaxed);
465                    m.ring0_total_ns += compiled.stats.total_ns.load(Ordering::Relaxed);
466                }
467                ExecutablePipeline::Fallback { .. } => {
468                    m.pipelines_fallback += 1;
469                }
470            }
471        }
472
473        for consumer in &self.consumers {
474            let snap = consumer.stats().snapshot();
475            m.bridge_backpressure_drops += snap.events_dropped;
476            m.bridge_batches_flushed += snap.batches_flushed;
477            m.ring1_rows_flushed += snap.rows_flushed;
478        }
479
480        m
481    }
482}
483
484impl std::fmt::Debug for StreamingQuery {
485    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
486        f.debug_struct("StreamingQuery")
487            .field("id", &self.id)
488            .field("state", &self.state)
489            .field("pipeline_count", &self.pipelines.len())
490            .finish_non_exhaustive()
491    }
492}
493
494// ────────────────────────────── Tests ────────────────────────────────
495
496#[cfg(test)]
497#[allow(clippy::cast_precision_loss)]
498mod tests {
499    use super::*;
500    use crate::compiler::pipeline::{CompiledPipeline, PipelineId};
501    use crate::compiler::pipeline_bridge::create_pipeline_bridge;
502    use crate::compiler::policy::{BackpressureStrategy, BatchPolicy};
503    use crate::compiler::row::MutableEventRow;
504    use arrow_schema::{DataType, Field, Schema};
505    use bumpalo::Bump;
506
507    // ── Helpers ──────────────────────────────────────────────────────
508
509    fn make_schema(fields: Vec<(&str, DataType)>) -> Arc<Schema> {
510        Arc::new(Schema::new(
511            fields
512                .into_iter()
513                .map(|(name, dt)| Field::new(name, dt, false))
514                .collect::<Vec<_>>(),
515        ))
516    }
517
518    fn make_row_schema(fields: Vec<(&str, DataType)>) -> Arc<RowSchema> {
519        let arrow = make_schema(fields);
520        Arc::new(RowSchema::from_arrow(&arrow).unwrap())
521    }
522
523    /// Creates a Fallback pipeline + bridge + consumer triplet.
524    fn make_fallback(
525        id: u32,
526        row_schema: &Arc<RowSchema>,
527    ) -> (
528        ExecutablePipeline,
529        PipelineBridge,
530        BridgeConsumer,
531        Arc<RowSchema>,
532    ) {
533        let exec = ExecutablePipeline::Fallback {
534            pipeline_id: PipelineId(id),
535            reason: crate::compiler::error::CompileError::UnsupportedExpr(
536                "test fallback".to_string(),
537            ),
538        };
539        let (bridge, consumer) = create_pipeline_bridge(
540            Arc::clone(row_schema),
541            64,
542            1024,
543            BatchPolicy::default(),
544            BackpressureStrategy::DropNewest,
545        )
546        .unwrap();
547        (exec, bridge, consumer, Arc::clone(row_schema))
548    }
549
550    /// Creates a Compiled pipeline (always-emit) + bridge + consumer triplet.
551    fn make_compiled_emit(
552        id: u32,
553        row_schema: &Arc<RowSchema>,
554    ) -> (
555        ExecutablePipeline,
556        PipelineBridge,
557        BridgeConsumer,
558        Arc<RowSchema>,
559    ) {
560        unsafe extern "C" fn always_emit(input: *const u8, output: *mut u8) -> u8 {
561            // Copy 64 bytes from input to output as a simple passthrough.
562            std::ptr::copy_nonoverlapping(input, output, 64);
563            1 // Emit
564        }
565
566        let compiled = Arc::new(CompiledPipeline::new(
567            PipelineId(id),
568            always_emit,
569            Arc::clone(row_schema),
570            Arc::clone(row_schema),
571        ));
572        let exec = ExecutablePipeline::Compiled(compiled);
573        let (bridge, consumer) = create_pipeline_bridge(
574            Arc::clone(row_schema),
575            64,
576            1024,
577            BatchPolicy::default(),
578            BackpressureStrategy::DropNewest,
579        )
580        .unwrap();
581        (exec, bridge, consumer, Arc::clone(row_schema))
582    }
583
584    /// Creates a Compiled pipeline (always-drop) + bridge + consumer triplet.
585    fn make_compiled_drop(
586        id: u32,
587        row_schema: &Arc<RowSchema>,
588    ) -> (
589        ExecutablePipeline,
590        PipelineBridge,
591        BridgeConsumer,
592        Arc<RowSchema>,
593    ) {
594        unsafe extern "C" fn always_drop(_: *const u8, _: *mut u8) -> u8 {
595            0 // Drop
596        }
597
598        let compiled = Arc::new(CompiledPipeline::new(
599            PipelineId(id),
600            always_drop,
601            Arc::clone(row_schema),
602            Arc::clone(row_schema),
603        ));
604        let exec = ExecutablePipeline::Compiled(compiled);
605        let (bridge, consumer) = create_pipeline_bridge(
606            Arc::clone(row_schema),
607            64,
608            1024,
609            BatchPolicy::default(),
610            BackpressureStrategy::DropNewest,
611        )
612        .unwrap();
613        (exec, bridge, consumer, Arc::clone(row_schema))
614    }
615
616    fn make_event_row<'a>(
617        arena: &'a Bump,
618        schema: &'a RowSchema,
619        ts: i64,
620        val: f64,
621    ) -> EventRow<'a> {
622        let mut row = MutableEventRow::new_in(arena, schema, 0);
623        row.set_i64(0, ts);
624        row.set_f64(1, val);
625        row.freeze()
626    }
627
628    fn default_schema() -> Arc<RowSchema> {
629        make_row_schema(vec![("ts", DataType::Int64), ("val", DataType::Float64)])
630    }
631
632    fn build_query_with_fallback(sql: &str) -> StreamingQuery {
633        let schema = default_schema();
634        let (exec, bridge, consumer, s) = make_fallback(0, &schema);
635        StreamingQueryBuilder::new(sql)
636            .add_pipeline(exec, bridge, consumer, s)
637            .build()
638            .unwrap()
639    }
640
641    fn build_query_with_compiled(sql: &str) -> StreamingQuery {
642        let schema = default_schema();
643        let (exec, bridge, consumer, s) = make_compiled_emit(0, &schema);
644        StreamingQueryBuilder::new(sql)
645            .add_pipeline(exec, bridge, consumer, s)
646            .build()
647            .unwrap()
648    }
649
650    // ── Builder tests ───────────────────────────────────────────────
651
652    #[test]
653    fn builder_empty_error() {
654        let result = StreamingQueryBuilder::new("SELECT 1").build();
655        assert!(result.is_err());
656        assert!(matches!(result.unwrap_err(), QueryError::NoPipelines));
657    }
658
659    #[test]
660    fn builder_single_fallback() {
661        let query = build_query_with_fallback("SELECT * FROM t");
662        assert_eq!(query.pipeline_count(), 1);
663        assert_eq!(query.state(), QueryState::Ready);
664    }
665
666    #[test]
667    fn builder_single_compiled() {
668        let query = build_query_with_compiled("SELECT * FROM t WHERE val > 10");
669        assert_eq!(query.pipeline_count(), 1);
670        assert_eq!(query.state(), QueryState::Ready);
671    }
672
673    #[test]
674    fn builder_multiple_pipelines() {
675        let schema = default_schema();
676        let (e1, b1, c1, s1) = make_fallback(0, &schema);
677        let (e2, b2, c2, s2) = make_compiled_emit(1, &schema);
678        let query = StreamingQueryBuilder::new("SELECT * FROM t")
679            .add_pipeline(e1, b1, c1, s1)
680            .add_pipeline(e2, b2, c2, s2)
681            .build()
682            .unwrap();
683        assert_eq!(query.pipeline_count(), 2);
684    }
685
686    // ── Lifecycle tests ─────────────────────────────────────────────
687
688    #[test]
689    fn lifecycle_ready_to_running() {
690        let mut query = build_query_with_fallback("SELECT 1");
691        assert!(query.start().is_ok());
692        assert_eq!(query.state(), QueryState::Running);
693    }
694
695    #[test]
696    fn lifecycle_running_to_paused() {
697        let mut query = build_query_with_fallback("SELECT 1");
698        query.start().unwrap();
699        assert!(query.pause().is_ok());
700        assert_eq!(query.state(), QueryState::Paused);
701    }
702
703    #[test]
704    fn lifecycle_paused_to_running() {
705        let mut query = build_query_with_fallback("SELECT 1");
706        query.start().unwrap();
707        query.pause().unwrap();
708        assert!(query.resume().is_ok());
709        assert_eq!(query.state(), QueryState::Running);
710    }
711
712    #[test]
713    fn lifecycle_running_to_stopped() {
714        let mut query = build_query_with_fallback("SELECT 1");
715        query.start().unwrap();
716        assert!(query.stop().is_ok());
717        assert_eq!(query.state(), QueryState::Stopped);
718    }
719
720    #[test]
721    fn lifecycle_stopped_terminal() {
722        let mut query = build_query_with_fallback("SELECT 1");
723        query.start().unwrap();
724        query.stop().unwrap();
725
726        // Cannot start, pause, resume, or stop again.
727        assert!(query.start().is_err());
728        assert!(query.pause().is_err());
729        assert!(query.resume().is_err());
730        assert!(query.stop().is_err());
731    }
732
733    // ── Submit tests ────────────────────────────────────────────────
734
735    #[test]
736    fn submit_requires_running() {
737        let schema = default_schema();
738        let mut query = build_query_with_fallback("SELECT 1");
739        let arena = Bump::new();
740        let row = make_event_row(&arena, &schema, 1000, 1.0);
741
742        // Not started yet — should fail.
743        let result = query.submit_row(&row, 1000, 0);
744        assert!(result.is_err());
745        assert!(matches!(
746            result.unwrap_err(),
747            QueryError::InvalidState { .. }
748        ));
749    }
750
751    #[test]
752    fn submit_fallback_passthrough() {
753        let schema = default_schema();
754        let mut query = build_query_with_fallback("SELECT * FROM t");
755        query.start().unwrap();
756
757        let arena = Bump::new();
758        let row = make_event_row(&arena, &schema, 1000, 42.0);
759        let result = query.submit_row(&row, 1000, 0).unwrap();
760        assert_eq!(result, SubmitResult::Emitted);
761
762        // Flush via watermark and poll.
763        query.advance_watermark(2000).unwrap();
764        let actions = query.poll_ring1();
765        assert!(!actions.is_empty());
766    }
767
768    #[test]
769    fn submit_compiled_emit() {
770        let schema = default_schema();
771        let mut query = build_query_with_compiled("SELECT * FROM t WHERE val > 0");
772        query.start().unwrap();
773
774        let arena = Bump::new();
775        let row = make_event_row(&arena, &schema, 1000, 5.0);
776        let result = query.submit_row(&row, 1000, 0).unwrap();
777        assert_eq!(result, SubmitResult::Emitted);
778    }
779
780    #[test]
781    fn submit_compiled_filter_drop() {
782        let schema = default_schema();
783        let (exec, bridge, consumer, s) = make_compiled_drop(0, &schema);
784        let mut query = StreamingQueryBuilder::new("SELECT * FROM t WHERE val > 100")
785            .add_pipeline(exec, bridge, consumer, s)
786            .build()
787            .unwrap();
788        query.start().unwrap();
789
790        let arena = Bump::new();
791        let row = make_event_row(&arena, &schema, 1000, 5.0);
792        let result = query.submit_row(&row, 1000, 0).unwrap();
793        assert_eq!(result, SubmitResult::Filtered);
794    }
795
796    #[test]
797    fn submit_multiple_pipelines() {
798        let schema = default_schema();
799        let (e1, b1, c1, s1) = make_compiled_emit(0, &schema);
800        let (e2, b2, c2, s2) = make_fallback(1, &schema);
801        let mut query = StreamingQueryBuilder::new("SELECT * FROM t")
802            .add_pipeline(e1, b1, c1, s1)
803            .add_pipeline(e2, b2, c2, s2)
804            .build()
805            .unwrap();
806        query.start().unwrap();
807
808        let arena = Bump::new();
809        let row = make_event_row(&arena, &schema, 1000, 1.0);
810        let result = query.submit_row(&row, 1000, 0).unwrap();
811        assert_eq!(result, SubmitResult::Emitted);
812    }
813
814    #[test]
815    fn submit_advance_watermark() {
816        let mut query = build_query_with_fallback("SELECT 1");
817        query.start().unwrap();
818        assert!(query.advance_watermark(5000).is_ok());
819    }
820
821    #[test]
822    fn submit_checkpoint() {
823        let mut query = build_query_with_fallback("SELECT 1");
824        query.start().unwrap();
825        assert!(query.checkpoint(42).is_ok());
826    }
827
828    // ── Poll tests ──────────────────────────────────────────────────
829
830    #[test]
831    fn poll_empty() {
832        let mut query = build_query_with_fallback("SELECT 1");
833        query.start().unwrap();
834        let actions = query.poll_ring1();
835        assert!(actions.is_empty());
836    }
837
838    #[test]
839    fn poll_with_events() {
840        let schema = default_schema();
841        let mut query = build_query_with_fallback("SELECT * FROM t");
842        query.start().unwrap();
843
844        let arena = Bump::new();
845        let row = make_event_row(&arena, &schema, 1000, 1.0);
846        query.submit_row(&row, 1000, 0).unwrap();
847        query.advance_watermark(2000).unwrap();
848
849        let actions = query.poll_ring1();
850        assert!(!actions.is_empty());
851        // Should have ProcessBatch + AdvanceWatermark.
852        assert!(actions.len() >= 2);
853    }
854
855    #[test]
856    fn poll_watermark_flush() {
857        let schema = default_schema();
858        let mut query = build_query_with_fallback("SELECT * FROM t");
859        query.start().unwrap();
860
861        let arena = Bump::new();
862        for i in 0..3 {
863            let row = make_event_row(&arena, &schema, i * 100, i as f64);
864            query.submit_row(&row, i * 100, 0).unwrap();
865        }
866        query.advance_watermark(1000).unwrap();
867
868        let actions = query.poll_ring1();
869        let batch_count = actions
870            .iter()
871            .filter(|a| matches!(a, Ring1Action::ProcessBatch(_)))
872            .count();
873        assert!(batch_count >= 1);
874    }
875
876    #[test]
877    fn poll_multiple_consumers() {
878        let schema = default_schema();
879        let (e1, b1, c1, s1) = make_fallback(0, &schema);
880        let (e2, b2, c2, s2) = make_fallback(1, &schema);
881        let mut query = StreamingQueryBuilder::new("SELECT * FROM t")
882            .add_pipeline(e1, b1, c1, s1)
883            .add_pipeline(e2, b2, c2, s2)
884            .build()
885            .unwrap();
886        query.start().unwrap();
887
888        let arena = Bump::new();
889        let row = make_event_row(&arena, &schema, 1000, 1.0);
890        query.submit_row(&row, 1000, 0).unwrap();
891        query.advance_watermark(2000).unwrap();
892
893        let actions = query.poll_ring1();
894        // Both consumers should have actions.
895        let watermark_count = actions
896            .iter()
897            .filter(|a| matches!(a, Ring1Action::AdvanceWatermark(_)))
898            .count();
899        assert_eq!(watermark_count, 2);
900    }
901
902    // ── Hot-swap tests ──────────────────────────────────────────────
903
904    #[test]
905    fn swap_compatible() {
906        let mut query = build_query_with_fallback("SELECT 1 FROM t");
907        query.start().unwrap();
908
909        let new_query = build_query_with_compiled("SELECT 2 FROM t");
910        let old = query.swap(new_query).unwrap();
911
912        assert_eq!(old.state(), QueryState::Stopped);
913        assert_eq!(query.state(), QueryState::Running);
914    }
915
916    #[test]
917    fn swap_incompatible_count() {
918        let mut query = build_query_with_fallback("SELECT 1 FROM t");
919        query.start().unwrap();
920
921        // Build a query with 2 pipelines.
922        let schema = default_schema();
923        let (e1, b1, c1, s1) = make_fallback(0, &schema);
924        let (e2, b2, c2, s2) = make_fallback(1, &schema);
925        let new_query = StreamingQueryBuilder::new("SELECT 2 FROM t")
926            .add_pipeline(e1, b1, c1, s1)
927            .add_pipeline(e2, b2, c2, s2)
928            .build()
929            .unwrap();
930
931        let result = query.swap(new_query);
932        assert!(result.is_err());
933        assert!(matches!(
934            result.unwrap_err(),
935            QueryError::IncompatibleSchemas(_)
936        ));
937    }
938
939    #[test]
940    fn swap_requires_running_or_paused() {
941        let mut query = build_query_with_fallback("SELECT 1 FROM t");
942        let new_query = build_query_with_fallback("SELECT 2 FROM t");
943
944        // Still in Ready state — should fail.
945        let result = query.swap(new_query);
946        assert!(result.is_err());
947    }
948
949    // ── Metrics tests ───────────────────────────────────────────────
950
951    #[test]
952    fn metrics_initial_zero() {
953        let query = build_query_with_compiled("SELECT 1 FROM t");
954        let m = query.metrics();
955        assert_eq!(m.ring0_events_in, 0);
956        assert_eq!(m.ring0_events_out, 0);
957        assert_eq!(m.ring0_events_dropped, 0);
958        assert_eq!(m.pipelines_compiled, 1);
959        assert_eq!(m.pipelines_fallback, 0);
960    }
961
962    #[test]
963    fn metrics_after_submit() {
964        let schema = default_schema();
965        let mut query = build_query_with_compiled("SELECT * FROM t WHERE val > 0");
966        query.start().unwrap();
967
968        let arena = Bump::new();
969        for i in 0..5 {
970            let row = make_event_row(&arena, &schema, i * 100, i as f64);
971            query.submit_row(&row, i * 100, 0).unwrap();
972        }
973
974        let m = query.metrics();
975        assert_eq!(m.ring0_events_in, 5);
976        assert_eq!(m.ring0_events_out, 5);
977        assert_eq!(m.pipelines_compiled, 1);
978    }
979
980    // ── Accessor tests ──────────────────────────────────────────────
981
982    #[test]
983    fn query_id_deterministic() {
984        let q1 = build_query_with_fallback("SELECT * FROM t");
985        let q2 = build_query_with_fallback("SELECT * FROM t");
986        assert_eq!(q1.id(), q2.id());
987
988        let q3 = build_query_with_fallback("SELECT * FROM other");
989        assert_ne!(q1.id(), q3.id());
990    }
991
992    #[test]
993    fn metadata_preserved() {
994        let schema = default_schema();
995        let (exec, bridge, consumer, s) = make_fallback(0, &schema);
996        let meta = QueryMetadata {
997            compiled_pipeline_count: 5,
998            fallback_pipeline_count: 3,
999            jit_enabled: true,
1000            ..Default::default()
1001        };
1002        let query = StreamingQueryBuilder::new("SELECT 1")
1003            .add_pipeline(exec, bridge, consumer, s)
1004            .with_metadata(meta)
1005            .build()
1006            .unwrap();
1007
1008        assert_eq!(query.metadata().compiled_pipeline_count, 5);
1009        assert_eq!(query.metadata().fallback_pipeline_count, 3);
1010        assert!(query.metadata().jit_enabled);
1011        assert_eq!(query.metadata().total_pipelines(), 8);
1012    }
1013}