1use std::fmt;
7use std::time::Duration;
8
9use super::pipeline_bridge::PipelineBridgeError;
10use super::policy::{BackpressureStrategy, BatchPolicy};
11
12#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
16pub struct QueryId(pub u64);
17
18impl fmt::Display for QueryId {
19 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20 write!(f, "0x{:016x}", self.0)
21 }
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum QueryState {
29 Ready,
31 Running,
33 Paused,
35 Stopped,
37}
38
39impl QueryState {
40 #[must_use]
42 pub fn is_terminal(self) -> bool {
43 matches!(self, Self::Stopped)
44 }
45}
46
47impl fmt::Display for QueryState {
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 Self::Ready => write!(f, "Ready"),
51 Self::Running => write!(f, "Running"),
52 Self::Paused => write!(f, "Paused"),
53 Self::Stopped => write!(f, "Stopped"),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
62pub struct QueryConfig {
63 pub jit_enabled: bool,
65 pub batch_policy: BatchPolicy,
67 pub backpressure: BackpressureStrategy,
69 pub max_cache_entries: usize,
71 pub queue_capacity: usize,
73 pub output_buffer_size: usize,
75}
76
77impl Default for QueryConfig {
78 fn default() -> Self {
79 Self {
80 jit_enabled: cfg!(feature = "jit"),
81 batch_policy: BatchPolicy::default(),
82 backpressure: BackpressureStrategy::default(),
83 max_cache_entries: 64,
84 queue_capacity: 4096,
85 output_buffer_size: 8192,
86 }
87 }
88}
89
90#[derive(Debug, Clone, Default)]
94pub enum StateStoreConfig {
95 #[default]
97 InMemory,
98}
99
100#[derive(Debug, thiserror::Error)]
104pub enum QueryError {
105 #[error("invalid state: expected {expected}, actual {actual}")]
107 InvalidState {
108 expected: &'static str,
110 actual: QueryState,
112 },
113 #[error("bridge error: {0}")]
115 Bridge(#[from] PipelineBridgeError),
116 #[error("pipeline {pipeline_idx} execution error")]
118 PipelineError {
119 pipeline_idx: usize,
121 },
122 #[error("incompatible schemas: {0}")]
124 IncompatibleSchemas(String),
125 #[error("no pipelines added to query builder")]
127 NoPipelines,
128 #[error("build error: {0}")]
130 Build(String),
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub enum SubmitResult {
138 Emitted,
140 Filtered,
142}
143
144#[derive(Debug, Clone, Default)]
148pub struct QueryMetadata {
149 pub plan_time: Duration,
151 pub extract_time: Duration,
153 pub compile_time: Duration,
155 pub compiled_pipeline_count: usize,
157 pub fallback_pipeline_count: usize,
159 pub jit_enabled: bool,
161}
162
163impl QueryMetadata {
164 #[must_use]
166 pub fn total_pipelines(&self) -> usize {
167 self.compiled_pipeline_count + self.fallback_pipeline_count
168 }
169}
170
171#[derive(Debug, Clone, Default, PartialEq, Eq)]
175pub struct QueryMetrics {
176 pub ring0_events_in: u64,
178 pub ring0_events_out: u64,
180 pub ring0_events_dropped: u64,
182 pub ring0_total_ns: u64,
184 pub bridge_pending: u64,
186 pub bridge_backpressure_drops: u64,
188 pub bridge_batches_flushed: u64,
190 pub ring1_rows_flushed: u64,
192 pub pipelines_compiled: u64,
194 pub pipelines_fallback: u64,
196}
197
198#[cfg(test)]
201#[allow(clippy::single_char_pattern)]
202mod tests {
203 use super::*;
204
205 #[test]
206 fn query_id_display() {
207 let id = QueryId(0xDEAD_BEEF_CAFE_BABE);
208 let s = format!("{id}");
209 assert_eq!(s, "0xdeadbeefcafebabe");
210 }
211
212 #[test]
213 fn query_state_is_terminal() {
214 assert!(!QueryState::Ready.is_terminal());
215 assert!(!QueryState::Running.is_terminal());
216 assert!(!QueryState::Paused.is_terminal());
217 assert!(QueryState::Stopped.is_terminal());
218 }
219
220 #[test]
221 fn query_config_defaults() {
222 let config = QueryConfig::default();
223 assert_eq!(config.max_cache_entries, 64);
224 assert_eq!(config.queue_capacity, 4096);
225 assert_eq!(config.output_buffer_size, 8192);
226 assert!(matches!(
227 config.backpressure,
228 BackpressureStrategy::DropNewest
229 ));
230 }
231
232 #[test]
233 fn query_metadata_total_pipelines() {
234 let meta = QueryMetadata {
235 compiled_pipeline_count: 3,
236 fallback_pipeline_count: 2,
237 ..Default::default()
238 };
239 assert_eq!(meta.total_pipelines(), 5);
240 }
241
242 #[test]
243 fn query_error_display() {
244 let err = QueryError::InvalidState {
245 expected: "Running",
246 actual: QueryState::Ready,
247 };
248 let s = format!("{err}");
249 assert!(s.contains("Running"));
250 assert!(s.contains("Ready"));
251
252 let err2 = QueryError::NoPipelines;
253 assert!(format!("{err2}").contains("no pipelines"));
254
255 let err3 = QueryError::PipelineError { pipeline_idx: 5 };
256 assert!(format!("{err3}").contains("5"));
257 }
258
259 #[test]
260 fn submit_result_variants() {
261 assert_eq!(SubmitResult::Emitted, SubmitResult::Emitted);
262 assert_ne!(SubmitResult::Emitted, SubmitResult::Filtered);
263 }
264
265 #[test]
266 fn query_metrics_default() {
267 let m = QueryMetrics::default();
268 assert_eq!(m.ring0_events_in, 0);
269 assert_eq!(m.ring0_events_out, 0);
270 assert_eq!(m.ring0_events_dropped, 0);
271 assert_eq!(m.ring0_total_ns, 0);
272 assert_eq!(m.bridge_pending, 0);
273 assert_eq!(m.bridge_backpressure_drops, 0);
274 assert_eq!(m.bridge_batches_flushed, 0);
275 assert_eq!(m.ring1_rows_flushed, 0);
276 assert_eq!(m.pipelines_compiled, 0);
277 assert_eq!(m.pipelines_fallback, 0);
278 }
279
280 #[test]
281 fn state_store_config_default() {
282 let c = StateStoreConfig::default();
283 assert!(matches!(c, StateStoreConfig::InMemory));
284 }
285}