Skip to main content

laminar_core/mv/
mod.rs

1//! # Cascading Materialized Views
2//!
3//! Materialized views that can read from other materialized views, forming a DAG.
4//! Essential for multi-resolution time-series aggregation (e.g., 1s → 1m → 1h OHLC bars).
5//!
6//! ## Key Components
7//!
8//! - [`MvRegistry`] - Manages view definitions with dependency tracking
9//! - [`MaterializedView`] - View definition with SQL, dependencies, and schema
10//!
11//! ## Architecture
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────────────────┐
15//! │                      MV Pipeline                            │
16//! ├─────────────────────────────────────────────────────────────┤
17//! │                                                             │
18//! │   Base Table         MV Level 1         MV Level 2          │
19//! │  ┌─────────┐       ┌─────────┐        ┌─────────┐          │
20//! │  │ trades  │──────▶│ ohlc_1s │───────▶│ ohlc_1m │──────▶   │
21//! │  └─────────┘       └─────────┘        └─────────┘          │
22//! │       │                 │                  │                │
23//! │       ▼                 ▼                  ▼                │
24//! │   Watermark ─────▶ Watermark ───────▶ Watermark            │
25//! │   (source)         (min of sources)  (min of sources)      │
26//! │                                                             │
27//! └─────────────────────────────────────────────────────────────┘
28//! ```
29//!
30//! ## Example: Cascading OHLC Bars
31//!
32//! ```rust,ignore
33//! use laminar_core::mv::{MvRegistry, MaterializedView, MvPipelineExecutor};
34//! use arrow_schema::{Schema, Field, DataType};
35//! use std::sync::Arc;
36//!
37//! // Create registry
38//! let mut registry = MvRegistry::new();
39//! registry.register_base_table("trades");
40//!
41//! // Define schema for OHLC bars
42//! let schema = Arc::new(Schema::new(vec![
43//!     Field::new("symbol", DataType::Utf8, false),
44//!     Field::new("open", DataType::Float64, false),
45//!     Field::new("high", DataType::Float64, false),
46//!     Field::new("low", DataType::Float64, false),
47//!     Field::new("close", DataType::Float64, false),
48//!     Field::new("volume", DataType::Int64, false),
49//! ]));
50//!
51//! // Register cascading views: trades -> ohlc_1s -> ohlc_1m -> ohlc_1h
52//! let ohlc_1s = MaterializedView::new(
53//!     "ohlc_1s",
54//!     "SELECT symbol, FIRST_VALUE(price), MAX(price), MIN(price), LAST_VALUE(price), SUM(qty) FROM trades GROUP BY symbol, TUMBLE(ts, '1 second')",
55//!     vec!["trades".into()],
56//!     schema.clone(),
57//! );
58//! registry.register(ohlc_1s).unwrap();
59//!
60//! let ohlc_1m = MaterializedView::new(
61//!     "ohlc_1m",
62//!     "SELECT symbol, FIRST_VALUE(open), MAX(high), MIN(low), LAST_VALUE(close), SUM(volume) FROM ohlc_1s GROUP BY symbol, TUMBLE(bar_time, '1 minute')",
63//!     vec!["ohlc_1s".into()],
64//!     schema.clone(),
65//! );
66//! registry.register(ohlc_1m).unwrap();
67//!
68//! // Views are processed in topological order
69//! assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m"]);
70//!
71//! // Create executor
72//! let registry = Arc::new(registry);
73//! let mut executor = MvPipelineExecutor::new(Arc::clone(&registry));
74//!
75//! // Register operators for each view
76//! executor.register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s"))).unwrap();
77//! executor.register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m"))).unwrap();
78//!
79//! assert!(executor.is_ready());
80//! ```
81//!
82//! ## Dependency Tracking
83//!
84//! The registry tracks view dependencies and provides topological ordering
85//! for cascading evaluation.
86//!
87//! ## Cycle Detection
88//!
89//! The registry prevents cycles during registration:
90//!
91//! ```rust
92//! use laminar_core::mv::{MvRegistry, MaterializedView, MvError};
93//! use arrow_schema::{Schema, Field, DataType};
94//! use std::sync::Arc;
95//!
96//! let mut registry = MvRegistry::new();
97//! registry.register_base_table("base");
98//!
99//! let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
100//! let mv = |n: &str, s: Vec<&str>| {
101//!     MaterializedView::new(n, "", s.into_iter().map(String::from).collect(), schema.clone())
102//! };
103//!
104//! registry.register(mv("a", vec!["base"])).unwrap();
105//! registry.register(mv("b", vec!["a"])).unwrap();
106//!
107//! // Can't create a cycle back to a
108//! // (Note: cycles through existing views require modification, not new registration)
109//! ```
110//!
111//! ## Cascade Drops
112//!
113//! Use `unregister_cascade` to drop a view and all its dependents:
114//!
115//! ```rust
116//! use laminar_core::mv::{MvRegistry, MaterializedView};
117//! use arrow_schema::{Schema, Field, DataType};
118//! use std::sync::Arc;
119//!
120//! let mut registry = MvRegistry::new();
121//! registry.register_base_table("trades");
122//!
123//! let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
124//! let mv = |n: &str, s: Vec<&str>| {
125//!     MaterializedView::new(n, "", s.into_iter().map(String::from).collect(), schema.clone())
126//! };
127//!
128//! registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
129//! registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
130//! registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
131//!
132//! // Drop ohlc_1s and all dependents
133//! let removed = registry.unregister_cascade("ohlc_1s").unwrap();
134//! assert_eq!(removed.len(), 3);
135//! assert!(registry.is_empty());
136//! ```
137
138mod error;
139mod registry;
140
141pub use error::{MvError, MvState};
142pub use registry::{MaterializedView, MvRegistry};
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use arrow_schema::{DataType, Field, Schema};
148    use std::sync::Arc;
149
150    #[test]
151    fn test_dependency_chain() {
152        let mut registry = MvRegistry::new();
153        registry.register_base_table("trades");
154
155        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
156        let mv = |n: &str, s: Vec<&str>| {
157            MaterializedView::new(
158                n,
159                "",
160                s.into_iter().map(String::from).collect(),
161                schema.clone(),
162            )
163        };
164
165        registry.register(mv("a", vec!["trades"])).unwrap();
166        registry.register(mv("b", vec!["a"])).unwrap();
167        registry.register(mv("c", vec!["b"])).unwrap();
168
169        let chain = registry.dependency_chain("c");
170        assert_eq!(chain, vec!["a", "b", "c"]);
171    }
172
173    #[test]
174    fn test_show_dependencies_equivalent() {
175        let mut registry = MvRegistry::new();
176        registry.register_base_table("trades");
177
178        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
179        let mv = |n: &str, s: Vec<&str>| {
180            MaterializedView::new(
181                n,
182                "",
183                s.into_iter().map(String::from).collect(),
184                schema.clone(),
185            )
186        };
187
188        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
189        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
190        registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
191
192        let chain = registry.dependency_chain("ohlc_1h");
193        assert_eq!(chain, vec!["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
194
195        let direct: Vec<_> = registry.get_dependencies("ohlc_1h").collect();
196        assert_eq!(direct, vec!["ohlc_1m"]);
197    }
198}