Expand description
§Cascading Materialized Views
Materialized views that can read from other materialized views, forming a DAG. Essential for multi-resolution time-series aggregation (e.g., 1s → 1m → 1h OHLC bars).
§Key Components
MvRegistry- Manages view definitions with dependency trackingMaterializedView- View definition with SQL, dependencies, and schema
§Architecture
┌─────────────────────────────────────────────────────────────┐
│ MV Pipeline │
├─────────────────────────────────────────────────────────────┤
│ │
│ Base Table MV Level 1 MV Level 2 │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ trades │──────▶│ ohlc_1s │───────▶│ ohlc_1m │──────▶ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Watermark ─────▶ Watermark ───────▶ Watermark │
│ (source) (min of sources) (min of sources) │
│ │
└─────────────────────────────────────────────────────────────┘§Example: Cascading OHLC Bars
ⓘ
use laminar_core::mv::{MvRegistry, MaterializedView, MvPipelineExecutor};
use arrow_schema::{Schema, Field, DataType};
use std::sync::Arc;
// Create registry
let mut registry = MvRegistry::new();
registry.register_base_table("trades");
// Define schema for OHLC bars
let schema = Arc::new(Schema::new(vec![
Field::new("symbol", DataType::Utf8, false),
Field::new("open", DataType::Float64, false),
Field::new("high", DataType::Float64, false),
Field::new("low", DataType::Float64, false),
Field::new("close", DataType::Float64, false),
Field::new("volume", DataType::Int64, false),
]));
// Register cascading views: trades -> ohlc_1s -> ohlc_1m -> ohlc_1h
let ohlc_1s = MaterializedView::new(
"ohlc_1s",
"SELECT symbol, FIRST_VALUE(price), MAX(price), MIN(price), LAST_VALUE(price), SUM(qty) FROM trades GROUP BY symbol, TUMBLE(ts, '1 second')",
vec!["trades".into()],
schema.clone(),
);
registry.register(ohlc_1s).unwrap();
let ohlc_1m = MaterializedView::new(
"ohlc_1m",
"SELECT symbol, FIRST_VALUE(open), MAX(high), MIN(low), LAST_VALUE(close), SUM(volume) FROM ohlc_1s GROUP BY symbol, TUMBLE(bar_time, '1 minute')",
vec!["ohlc_1s".into()],
schema.clone(),
);
registry.register(ohlc_1m).unwrap();
// Views are processed in topological order
assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m"]);
// Create executor
let registry = Arc::new(registry);
let mut executor = MvPipelineExecutor::new(Arc::clone(®istry));
// Register operators for each view
executor.register_operator("ohlc_1s", Box::new(PassThroughOperator::new("ohlc_1s"))).unwrap();
executor.register_operator("ohlc_1m", Box::new(PassThroughOperator::new("ohlc_1m"))).unwrap();
assert!(executor.is_ready());§Dependency Tracking
The registry tracks view dependencies and provides topological ordering for cascading evaluation.
§Cycle Detection
The registry prevents cycles during registration:
use laminar_core::mv::{MvRegistry, MaterializedView, MvError};
use arrow_schema::{Schema, Field, DataType};
use std::sync::Arc;
let mut registry = MvRegistry::new();
registry.register_base_table("base");
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
let mv = |n: &str, s: Vec<&str>| {
MaterializedView::new(n, "", s.into_iter().map(String::from).collect(), schema.clone())
};
registry.register(mv("a", vec!["base"])).unwrap();
registry.register(mv("b", vec!["a"])).unwrap();
// Can't create a cycle back to a
// (Note: cycles through existing views require modification, not new registration)§Cascade Drops
Use unregister_cascade to drop a view and all its dependents:
use laminar_core::mv::{MvRegistry, MaterializedView};
use arrow_schema::{Schema, Field, DataType};
use std::sync::Arc;
let mut registry = MvRegistry::new();
registry.register_base_table("trades");
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
let mv = |n: &str, s: Vec<&str>| {
MaterializedView::new(n, "", s.into_iter().map(String::from).collect(), schema.clone())
};
registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
// Drop ohlc_1s and all dependents
let removed = registry.unregister_cascade("ohlc_1s").unwrap();
assert_eq!(removed.len(), 3);
assert!(registry.is_empty());Structs§
- Materialized
View - Materialized view definition.
- MvRegistry
- Registry for managing materialized views.