laminar_core/mv/mod.rs
1//! # Cascading Materialized Views
2//!
3//! Materialized views that can read from other materialized views, forming a DAG.
4//! Essential for multi-resolution time-series aggregation (e.g., 1s → 1m → 1h OHLC bars).
5//!
6//! ## Key Components
7//!
8//! - [`MvRegistry`] - Manages view definitions with dependency tracking
9//! - [`MaterializedView`] - View definition with SQL, dependencies, and schema
10//!
11//! ## Architecture
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────────────────┐
15//! │ MV Pipeline │
16//! ├─────────────────────────────────────────────────────────────┤
17//! │ │
18//! │ Base Table MV Level 1 MV Level 2 │
19//! │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
20//! │ │ trades │──────▶│ ohlc_1s │───────▶│ ohlc_1m │──────▶ │
21//! │ └─────────┘ └─────────┘ └─────────┘ │
22//! │ │ │ │ │
23//! │ ▼ ▼ ▼ │
24//! │ Watermark ─────▶ Watermark ───────▶ Watermark │
25//! │ (source) (min of sources) (min of sources) │
26//! │ │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! ## Example: Cascading OHLC Bars
31//!
32//! ```rust,ignore
33//! use laminar_core::mv::{MvRegistry, MaterializedView, MvPipelineExecutor};
34//! use arrow_schema::{Schema, Field, DataType};
35//! use std::sync::Arc;
36//!
37//! // Create registry
38//! let mut registry = MvRegistry::new();
39//! registry.register_base_table("trades");
40//!
41//! // Define schema for OHLC bars
42//! let schema = Arc::new(Schema::new(vec![
43//! Field::new("symbol", DataType::Utf8, false),
44//! Field::new("open", DataType::Float64, false),
45//! Field::new("high", DataType::Float64, false),
46//! Field::new("low", DataType::Float64, false),
47//! Field::new("close", DataType::Float64, false),
48//! Field::new("volume", DataType::Int64, false),
49//! ]));
50//!
51//! // Register cascading views: trades -> ohlc_1s -> ohlc_1m -> ohlc_1h
52//! let ohlc_1s = MaterializedView::new(
53//! "ohlc_1s",
54//! "SELECT symbol, FIRST_VALUE(price), MAX(price), MIN(price), LAST_VALUE(price), SUM(qty) FROM trades GROUP BY symbol, TUMBLE(ts, '1 second')",
55//! vec!["trades".into()],
56//! schema.clone(),
57//! );
58//! registry.register(ohlc_1s).unwrap();
59//!
60//! let ohlc_1m = MaterializedView::new(
61//! "ohlc_1m",
62//! "SELECT symbol, FIRST_VALUE(open), MAX(high), MIN(low), LAST_VALUE(close), SUM(volume) FROM ohlc_1s GROUP BY symbol, TUMBLE(bar_time, '1 minute')",
63//! vec!["ohlc_1s".into()],
64//! schema.clone(),
65//! );
66//! registry.register(ohlc_1m).unwrap();
67//!
68//! // Views are processed in topological order
69//! assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m"]);
70//!
71//! // Create executor
72//! let registry = Arc::new(registry);
73//! let mut executor = MvPipelineExecutor::new(Arc::clone(®istry));
74//!
75//! // Register operators for each view
76//! executor.register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s"))).unwrap();
77//! executor.register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m"))).unwrap();
78//!
79//! assert!(executor.is_ready());
80//! ```
81//!
82//! ## Dependency Tracking
83//!
84//! The registry tracks view dependencies and provides topological ordering
85//! for cascading evaluation.
86//!
87//! ## Cycle Detection
88//!
89//! The registry prevents cycles during registration:
90//!
91//! ```rust
92//! use laminar_core::mv::{MvRegistry, MaterializedView, MvError};
93//! use arrow_schema::{Schema, Field, DataType};
94//! use std::sync::Arc;
95//!
96//! let mut registry = MvRegistry::new();
97//! registry.register_base_table("base");
98//!
99//! let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
100//! let mv = |n: &str, s: Vec<&str>| {
101//! MaterializedView::new(n, "", s.into_iter().map(String::from).collect(), schema.clone())
102//! };
103//!
104//! registry.register(mv("a", vec!["base"])).unwrap();
105//! registry.register(mv("b", vec!["a"])).unwrap();
106//!
107//! // Can't create a cycle back to a
108//! // (Note: cycles through existing views require modification, not new registration)
109//! ```
110//!
111//! ## Cascade Drops
112//!
113//! Use `unregister_cascade` to drop a view and all its dependents:
114//!
115//! ```rust
116//! use laminar_core::mv::{MvRegistry, MaterializedView};
117//! use arrow_schema::{Schema, Field, DataType};
118//! use std::sync::Arc;
119//!
120//! let mut registry = MvRegistry::new();
121//! registry.register_base_table("trades");
122//!
123//! let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
124//! let mv = |n: &str, s: Vec<&str>| {
125//! MaterializedView::new(n, "", s.into_iter().map(String::from).collect(), schema.clone())
126//! };
127//!
128//! registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
129//! registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
130//! registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
131//!
132//! // Drop ohlc_1s and all dependents
133//! let removed = registry.unregister_cascade("ohlc_1s").unwrap();
134//! assert_eq!(removed.len(), 3);
135//! assert!(registry.is_empty());
136//! ```
137
138mod error;
139mod registry;
140
141pub use error::{MvError, MvState};
142pub use registry::{MaterializedView, MvRegistry};
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use arrow_schema::{DataType, Field, Schema};
148 use std::sync::Arc;
149
150 #[test]
151 fn test_dependency_chain() {
152 let mut registry = MvRegistry::new();
153 registry.register_base_table("trades");
154
155 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
156 let mv = |n: &str, s: Vec<&str>| {
157 MaterializedView::new(
158 n,
159 "",
160 s.into_iter().map(String::from).collect(),
161 schema.clone(),
162 )
163 };
164
165 registry.register(mv("a", vec!["trades"])).unwrap();
166 registry.register(mv("b", vec!["a"])).unwrap();
167 registry.register(mv("c", vec!["b"])).unwrap();
168
169 let chain = registry.dependency_chain("c");
170 assert_eq!(chain, vec!["a", "b", "c"]);
171 }
172
173 #[test]
174 fn test_show_dependencies_equivalent() {
175 let mut registry = MvRegistry::new();
176 registry.register_base_table("trades");
177
178 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
179 let mv = |n: &str, s: Vec<&str>| {
180 MaterializedView::new(
181 n,
182 "",
183 s.into_iter().map(String::from).collect(),
184 schema.clone(),
185 )
186 };
187
188 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
189 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
190 registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
191
192 let chain = registry.dependency_chain("ohlc_1h");
193 assert_eq!(chain, vec!["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
194
195 let direct: Vec<_> = registry.get_dependencies("ohlc_1h").collect();
196 assert_eq!(direct, vec!["ohlc_1m"]);
197 }
198}