Skip to main content

laminar_core/compiler/
metrics.rs

1//! Streaming query lifecycle types: identifiers, states, configuration, errors, and metrics.
2//!
3//! These types are always available (not gated behind `jit`) so that configuration,
4//! error handling, and metrics aggregation can be used without pulling in Cranelift.
5
6use std::fmt;
7use std::time::Duration;
8
9use super::pipeline_bridge::PipelineBridgeError;
10use super::policy::{BackpressureStrategy, BatchPolicy};
11
12// ────────────────────────────── QueryId ──────────────────────────────
13
14/// Unique identifier for a streaming query, derived from hashing the SQL text.
15#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
16pub struct QueryId(pub u64);
17
18impl fmt::Display for QueryId {
19    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20        write!(f, "0x{:016x}", self.0)
21    }
22}
23
24// ────────────────────────────── QueryState ───────────────────────────
25
26/// Lifecycle state of a `StreamingQuery` (requires `jit` feature).
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum QueryState {
29    /// Query is built but not yet started.
30    Ready,
31    /// Query is actively processing events.
32    Running,
33    /// Query is temporarily paused (can be resumed).
34    Paused,
35    /// Query has been stopped (terminal state).
36    Stopped,
37}
38
39impl QueryState {
40    /// Returns `true` if this is a terminal state (no further transitions allowed).
41    #[must_use]
42    pub fn is_terminal(self) -> bool {
43        matches!(self, Self::Stopped)
44    }
45}
46
47impl fmt::Display for QueryState {
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            Self::Ready => write!(f, "Ready"),
51            Self::Running => write!(f, "Running"),
52            Self::Paused => write!(f, "Paused"),
53            Self::Stopped => write!(f, "Stopped"),
54        }
55    }
56}
57
58// ────────────────────────────── QueryConfig ──────────────────────────
59
60/// Configuration for a streaming query.
61#[derive(Debug, Clone)]
62pub struct QueryConfig {
63    /// Whether JIT compilation is enabled.
64    pub jit_enabled: bool,
65    /// Batching policy for the Ring 0 → Ring 1 bridge.
66    pub batch_policy: BatchPolicy,
67    /// Backpressure strategy for the bridge producer.
68    pub backpressure: BackpressureStrategy,
69    /// Maximum entries in the pipeline compiler cache.
70    pub max_cache_entries: usize,
71    /// SPSC queue capacity for each bridge.
72    pub queue_capacity: usize,
73    /// Output buffer size in bytes for each compiled pipeline.
74    pub output_buffer_size: usize,
75}
76
77impl Default for QueryConfig {
78    fn default() -> Self {
79        Self {
80            jit_enabled: cfg!(feature = "jit"),
81            batch_policy: BatchPolicy::default(),
82            backpressure: BackpressureStrategy::default(),
83            max_cache_entries: 64,
84            queue_capacity: 4096,
85            output_buffer_size: 8192,
86        }
87    }
88}
89
90// ────────────────────────────── StateStoreConfig ─────────────────────
91
92/// State store backend configuration (kept minimal for now).
93#[derive(Debug, Clone, Default)]
94pub enum StateStoreConfig {
95    /// In-memory state store (default).
96    #[default]
97    InMemory,
98}
99
100// ────────────────────────────── QueryError ───────────────────────────
101
102/// Errors from streaming query operations.
103#[derive(Debug, thiserror::Error)]
104pub enum QueryError {
105    /// The query is in the wrong state for this operation.
106    #[error("invalid state: expected {expected}, actual {actual}")]
107    InvalidState {
108        /// The expected state description.
109        expected: &'static str,
110        /// The actual state.
111        actual: QueryState,
112    },
113    /// Bridge error during event submission or control message send.
114    #[error("bridge error: {0}")]
115    Bridge(#[from] PipelineBridgeError),
116    /// Pipeline execution failed.
117    #[error("pipeline {pipeline_idx} execution error")]
118    PipelineError {
119        /// Index of the failed pipeline.
120        pipeline_idx: usize,
121    },
122    /// Schemas are incompatible (e.g., during hot-swap).
123    #[error("incompatible schemas: {0}")]
124    IncompatibleSchemas(String),
125    /// No pipelines were added to the builder.
126    #[error("no pipelines added to query builder")]
127    NoPipelines,
128    /// Build-time error.
129    #[error("build error: {0}")]
130    Build(String),
131}
132
133// ────────────────────────────── SubmitResult ─────────────────────────
134
135/// Result of submitting an event row to a streaming query.
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub enum SubmitResult {
138    /// At least one pipeline emitted output for this row.
139    Emitted,
140    /// All pipelines filtered out this row.
141    Filtered,
142}
143
144// ────────────────────────────── QueryMetadata ────────────────────────
145
146/// Metadata about how a query was compiled.
147#[derive(Debug, Clone, Default)]
148pub struct QueryMetadata {
149    /// Time spent planning the query.
150    pub plan_time: Duration,
151    /// Time spent extracting pipelines from the logical plan.
152    pub extract_time: Duration,
153    /// Time spent compiling pipelines via Cranelift.
154    pub compile_time: Duration,
155    /// Number of pipelines successfully compiled to native code.
156    pub compiled_pipeline_count: usize,
157    /// Number of pipelines using fallback (interpreted) execution.
158    pub fallback_pipeline_count: usize,
159    /// Whether JIT was enabled for this compilation.
160    pub jit_enabled: bool,
161}
162
163impl QueryMetadata {
164    /// Returns the total number of pipelines (compiled + fallback).
165    #[must_use]
166    pub fn total_pipelines(&self) -> usize {
167        self.compiled_pipeline_count + self.fallback_pipeline_count
168    }
169}
170
171// ────────────────────────────── QueryMetrics ─────────────────────────
172
173/// Aggregated runtime metrics for a streaming query.
174#[derive(Debug, Clone, Default, PartialEq, Eq)]
175pub struct QueryMetrics {
176    /// Total events submitted to Ring 0.
177    pub ring0_events_in: u64,
178    /// Events emitted from Ring 0 to bridges.
179    pub ring0_events_out: u64,
180    /// Events dropped by Ring 0 pipelines (filtered out).
181    pub ring0_events_dropped: u64,
182    /// Cumulative Ring 0 processing time in nanoseconds.
183    pub ring0_total_ns: u64,
184    /// Events pending in bridge queues.
185    pub bridge_pending: u64,
186    /// Events dropped due to bridge backpressure.
187    pub bridge_backpressure_drops: u64,
188    /// Total batches flushed to Ring 1.
189    pub bridge_batches_flushed: u64,
190    /// Total rows flushed to Ring 1 across all batches.
191    pub ring1_rows_flushed: u64,
192    /// Number of compiled (JIT) pipelines.
193    pub pipelines_compiled: u64,
194    /// Number of fallback (interpreted) pipelines.
195    pub pipelines_fallback: u64,
196}
197
198// ────────────────────────────── Tests ────────────────────────────────
199
200#[cfg(test)]
201#[allow(clippy::single_char_pattern)]
202mod tests {
203    use super::*;
204
205    #[test]
206    fn query_id_display() {
207        let id = QueryId(0xDEAD_BEEF_CAFE_BABE);
208        let s = format!("{id}");
209        assert_eq!(s, "0xdeadbeefcafebabe");
210    }
211
212    #[test]
213    fn query_state_is_terminal() {
214        assert!(!QueryState::Ready.is_terminal());
215        assert!(!QueryState::Running.is_terminal());
216        assert!(!QueryState::Paused.is_terminal());
217        assert!(QueryState::Stopped.is_terminal());
218    }
219
220    #[test]
221    fn query_config_defaults() {
222        let config = QueryConfig::default();
223        assert_eq!(config.max_cache_entries, 64);
224        assert_eq!(config.queue_capacity, 4096);
225        assert_eq!(config.output_buffer_size, 8192);
226        assert!(matches!(
227            config.backpressure,
228            BackpressureStrategy::DropNewest
229        ));
230    }
231
232    #[test]
233    fn query_metadata_total_pipelines() {
234        let meta = QueryMetadata {
235            compiled_pipeline_count: 3,
236            fallback_pipeline_count: 2,
237            ..Default::default()
238        };
239        assert_eq!(meta.total_pipelines(), 5);
240    }
241
242    #[test]
243    fn query_error_display() {
244        let err = QueryError::InvalidState {
245            expected: "Running",
246            actual: QueryState::Ready,
247        };
248        let s = format!("{err}");
249        assert!(s.contains("Running"));
250        assert!(s.contains("Ready"));
251
252        let err2 = QueryError::NoPipelines;
253        assert!(format!("{err2}").contains("no pipelines"));
254
255        let err3 = QueryError::PipelineError { pipeline_idx: 5 };
256        assert!(format!("{err3}").contains("5"));
257    }
258
259    #[test]
260    fn submit_result_variants() {
261        assert_eq!(SubmitResult::Emitted, SubmitResult::Emitted);
262        assert_ne!(SubmitResult::Emitted, SubmitResult::Filtered);
263    }
264
265    #[test]
266    fn query_metrics_default() {
267        let m = QueryMetrics::default();
268        assert_eq!(m.ring0_events_in, 0);
269        assert_eq!(m.ring0_events_out, 0);
270        assert_eq!(m.ring0_events_dropped, 0);
271        assert_eq!(m.ring0_total_ns, 0);
272        assert_eq!(m.bridge_pending, 0);
273        assert_eq!(m.bridge_backpressure_drops, 0);
274        assert_eq!(m.bridge_batches_flushed, 0);
275        assert_eq!(m.ring1_rows_flushed, 0);
276        assert_eq!(m.pipelines_compiled, 0);
277        assert_eq!(m.pipelines_fallback, 0);
278    }
279
280    #[test]
281    fn state_store_config_default() {
282        let c = StateStoreConfig::default();
283        assert!(matches!(c, StateStoreConfig::InMemory));
284    }
285}