Skip to main content

laminar_core/mv/
registry.rs

1//! Materialized view registry with dependency tracking.
2//!
3//! The registry maintains a directed acyclic graph (DAG) of materialized views,
4//! tracking dependencies between views and ensuring correct processing order.
5
6use super::error::{MvError, MvState};
7use arrow_schema::SchemaRef;
8use rustc_hash::{FxHashMap, FxHashSet};
9use std::collections::VecDeque;
10
11/// Materialized view definition.
12///
13/// A materialized view is a query result that is stored and incrementally
14/// maintained as its source data changes.
15#[derive(Debug, Clone)]
16pub struct MaterializedView {
17    /// Unique view name.
18    pub name: String,
19    /// SQL definition (for reference and introspection).
20    pub sql: String,
21    /// Input sources (base tables or other MVs).
22    pub sources: Vec<String>,
23    /// Output schema of the view.
24    pub schema: SchemaRef,
25    /// Associated operator ID for event routing.
26    pub operator_id: String,
27    /// Current execution state.
28    pub state: MvState,
29}
30
31impl MaterializedView {
32    /// Creates a new materialized view definition.
33    #[must_use]
34    pub fn new(
35        name: impl Into<String>,
36        sql: impl Into<String>,
37        sources: Vec<String>,
38        schema: SchemaRef,
39    ) -> Self {
40        let name = name.into();
41        let operator_id = format!("mv_{name}");
42        Self {
43            name,
44            sql: sql.into(),
45            sources,
46            schema,
47            operator_id,
48            state: MvState::Running,
49        }
50    }
51
52    /// Creates a simple view with no schema (for testing).
53    #[cfg(test)]
54    pub fn simple(name: impl Into<String>, sources: Vec<String>) -> Self {
55        use arrow_schema::{DataType, Field, Schema};
56        use std::sync::Arc;
57
58        let schema = Arc::new(Schema::new(vec![Field::new(
59            "value",
60            DataType::Int64,
61            false,
62        )]));
63        Self::new(name, "", sources, schema)
64    }
65
66    /// Returns true if this view depends on the given source.
67    #[must_use]
68    pub fn depends_on(&self, source: &str) -> bool {
69        self.sources.iter().any(|s| s == source)
70    }
71}
72
73/// Registry for managing materialized views: stores the dependency DAG,
74/// detects cycles on registration, and exposes a topological order for
75/// correct processing.
76#[derive(Debug, Default)]
77pub struct MvRegistry {
78    /// All registered MVs by name.
79    views: FxHashMap<String, MaterializedView>,
80    /// Base tables (sources that are not MVs).
81    base_tables: FxHashSet<String>,
82    /// Dependency graph: MV name -> MVs that depend on it.
83    dependents: FxHashMap<String, FxHashSet<String>>,
84    /// Reverse dependency graph: MV name -> MVs it depends on.
85    dependencies: FxHashMap<String, FxHashSet<String>>,
86    /// Topological order for processing (dependencies first).
87    topo_order: Vec<String>,
88}
89
90impl MvRegistry {
91    /// Creates an empty registry.
92    #[must_use]
93    pub fn new() -> Self {
94        Self::default()
95    }
96
97    /// Registers a base table (source that is not an MV).
98    ///
99    /// Base tables are assumed to exist and can be referenced as sources
100    /// by materialized views.
101    pub fn register_base_table(&mut self, name: impl Into<String>) {
102        self.base_tables.insert(name.into());
103    }
104
105    /// Returns true if the given name is a registered base table.
106    #[must_use]
107    pub fn is_base_table(&self, name: &str) -> bool {
108        self.base_tables.contains(name)
109    }
110
111    /// Registers a new materialized view.
112    ///
113    /// # Errors
114    ///
115    /// Returns error if:
116    /// - View name already exists
117    /// - Source MV or base table doesn't exist
118    /// - Would create a dependency cycle
119    pub fn register(&mut self, view: MaterializedView) -> Result<(), MvError> {
120        // Check for duplicate name
121        if self.views.contains_key(&view.name) {
122            return Err(MvError::DuplicateName(view.name.clone()));
123        }
124
125        // Validate sources exist
126        for source in &view.sources {
127            if !self.views.contains_key(source) && !self.is_base_table(source) {
128                return Err(MvError::SourceNotFound(source.clone()));
129            }
130        }
131
132        // Check for cycles
133        if self.would_create_cycle(&view.name, &view.sources) {
134            return Err(MvError::CycleDetected(view.name.clone()));
135        }
136
137        // Update dependency graphs
138        for source in &view.sources {
139            self.dependents
140                .entry(source.clone())
141                .or_default()
142                .insert(view.name.clone());
143            self.dependencies
144                .entry(view.name.clone())
145                .or_default()
146                .insert(source.clone());
147        }
148
149        self.views.insert(view.name.clone(), view);
150        self.update_topo_order();
151
152        Ok(())
153    }
154
155    /// Unregisters a materialized view.
156    ///
157    /// # Errors
158    ///
159    /// Returns error if:
160    /// - View doesn't exist
161    /// - Other views depend on it (use `unregister_cascade` instead)
162    pub fn unregister(&mut self, name: &str) -> Result<MaterializedView, MvError> {
163        // Check if view exists
164        if !self.views.contains_key(name) {
165            return Err(MvError::ViewNotFound(name.to_string()));
166        }
167
168        // Check for dependents
169        if let Some(deps) = self.dependents.get(name) {
170            if !deps.is_empty() {
171                let dep_names: Vec<_> = deps.iter().cloned().collect();
172                return Err(MvError::HasDependents(name.to_string(), dep_names));
173            }
174        }
175
176        self.remove_view(name)
177    }
178
179    /// Unregisters a materialized view and all views that depend on it.
180    ///
181    /// Returns the views that were removed, in dependency order (dependents first).
182    ///
183    /// # Errors
184    ///
185    /// Returns error if the view doesn't exist.
186    pub fn unregister_cascade(&mut self, name: &str) -> Result<Vec<MaterializedView>, MvError> {
187        if !self.views.contains_key(name) {
188            return Err(MvError::ViewNotFound(name.to_string()));
189        }
190
191        // Collect all views to remove in dependency order (dependents first)
192        let mut to_remove = Vec::new();
193        self.collect_dependents_recursive(name, &mut to_remove);
194        to_remove.push(name.to_string());
195
196        // Remove in collected order (dependents first, then the view itself)
197        let mut removed = Vec::with_capacity(to_remove.len());
198        for view_name in to_remove {
199            if let Ok(view) = self.remove_view(&view_name) {
200                removed.push(view);
201            }
202        }
203
204        Ok(removed)
205    }
206
207    fn collect_dependents_recursive(&self, name: &str, result: &mut Vec<String>) {
208        if let Some(deps) = self.dependents.get(name) {
209            for dep in deps {
210                if !result.contains(dep) {
211                    self.collect_dependents_recursive(dep, result);
212                    result.push(dep.clone());
213                }
214            }
215        }
216    }
217
218    fn remove_view(&mut self, name: &str) -> Result<MaterializedView, MvError> {
219        let view = self
220            .views
221            .remove(name)
222            .ok_or_else(|| MvError::ViewNotFound(name.to_string()))?;
223
224        // Remove from dependency tracking
225        if let Some(sources) = self.dependencies.remove(name) {
226            for source in sources {
227                if let Some(deps) = self.dependents.get_mut(&source) {
228                    deps.remove(name);
229                }
230            }
231        }
232        self.dependents.remove(name);
233
234        // Update topological order
235        self.update_topo_order();
236
237        Ok(view)
238    }
239
240    /// Gets a view by name.
241    #[must_use]
242    pub fn get(&self, name: &str) -> Option<&MaterializedView> {
243        self.views.get(name)
244    }
245
246    /// Gets a mutable reference to a view by name.
247    #[must_use]
248    pub fn get_mut(&mut self, name: &str) -> Option<&mut MaterializedView> {
249        self.views.get_mut(name)
250    }
251
252    /// Returns all views in topological order (dependencies first).
253    #[must_use]
254    pub fn topo_order(&self) -> &[String] {
255        &self.topo_order
256    }
257
258    /// Returns all views that depend on the given source.
259    pub fn get_dependents(&self, source: &str) -> impl Iterator<Item = &str> {
260        self.dependents
261            .get(source)
262            .into_iter()
263            .flatten()
264            .map(String::as_str)
265    }
266
267    /// Returns all sources that the given view depends on.
268    pub fn get_dependencies(&self, view: &str) -> impl Iterator<Item = &str> {
269        self.dependencies
270            .get(view)
271            .into_iter()
272            .flatten()
273            .map(String::as_str)
274    }
275
276    /// Returns the number of registered views.
277    #[must_use]
278    pub fn len(&self) -> usize {
279        self.views.len()
280    }
281
282    /// Returns true if no views are registered.
283    #[must_use]
284    pub fn is_empty(&self) -> bool {
285        self.views.is_empty()
286    }
287
288    /// Returns an iterator over all registered views.
289    pub fn views(&self) -> impl Iterator<Item = &MaterializedView> {
290        self.views.values()
291    }
292
293    /// Returns the set of registered base tables.
294    #[must_use]
295    pub fn base_tables(&self) -> &FxHashSet<String> {
296        &self.base_tables
297    }
298
299    /// Returns the full dependency chain for a view (including transitive).
300    ///
301    /// The chain is returned in topological order (dependencies first).
302    #[must_use]
303    pub fn dependency_chain(&self, name: &str) -> Vec<String> {
304        let mut chain = Vec::new();
305        let mut visited = FxHashSet::default();
306        self.collect_dependencies_recursive(name, &mut chain, &mut visited);
307        chain
308    }
309
310    fn collect_dependencies_recursive(
311        &self,
312        name: &str,
313        result: &mut Vec<String>,
314        visited: &mut FxHashSet<String>,
315    ) {
316        if !visited.insert(name.to_string()) {
317            return;
318        }
319
320        if let Some(deps) = self.dependencies.get(name) {
321            for dep in deps {
322                self.collect_dependencies_recursive(dep, result, visited);
323            }
324        }
325
326        // Only add MVs, not base tables
327        if self.views.contains_key(name) {
328            result.push(name.to_string());
329        }
330    }
331
332    fn would_create_cycle(&self, new_name: &str, sources: &[String]) -> bool {
333        // DFS to check if any source transitively depends on new_name
334        let mut visited = FxHashSet::default();
335        let mut stack: Vec<_> = sources.to_vec();
336
337        while let Some(current) = stack.pop() {
338            if current == new_name {
339                return true;
340            }
341            if visited.insert(current.clone()) {
342                if let Some(deps) = self.dependencies.get(&current) {
343                    stack.extend(deps.iter().cloned());
344                }
345            }
346        }
347
348        false
349    }
350
351    fn update_topo_order(&mut self) {
352        // Kahn's algorithm for topological sort
353        let mut in_degree: FxHashMap<String, usize> = FxHashMap::default();
354        let mut queue: VecDeque<String> = VecDeque::new();
355
356        // Initialize in-degrees (count only MV dependencies, not base tables)
357        for name in self.views.keys() {
358            let deps = self.dependencies.get(name).map_or(0, |d| {
359                d.iter().filter(|dep| self.views.contains_key(*dep)).count()
360            });
361            in_degree.insert(name.clone(), deps);
362            if deps == 0 {
363                queue.push_back(name.clone());
364            }
365        }
366
367        // Process
368        self.topo_order.clear();
369        while let Some(name) = queue.pop_front() {
370            self.topo_order.push(name.clone());
371
372            if let Some(dependents) = self.dependents.get(&name) {
373                for dep in dependents {
374                    if let Some(count) = in_degree.get_mut(dep) {
375                        *count = count.saturating_sub(1);
376                        if *count == 0 {
377                            queue.push_back(dep.clone());
378                        }
379                    }
380                }
381            }
382        }
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389
390    fn mv(name: &str, sources: Vec<&str>) -> MaterializedView {
391        MaterializedView::simple(name, sources.into_iter().map(String::from).collect())
392    }
393
394    #[test]
395    fn test_simple_registration() {
396        let mut registry = MvRegistry::new();
397        registry.register_base_table("trades");
398
399        let view = mv("ohlc_1s", vec!["trades"]);
400        registry.register(view).unwrap();
401
402        assert_eq!(registry.len(), 1);
403        assert!(registry.get("ohlc_1s").is_some());
404    }
405
406    #[test]
407    fn test_cascading_registration() {
408        let mut registry = MvRegistry::new();
409        registry.register_base_table("trades");
410
411        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
412        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
413        registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
414
415        assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
416    }
417
418    #[test]
419    fn test_duplicate_name_error() {
420        let mut registry = MvRegistry::new();
421        registry.register_base_table("trades");
422
423        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
424
425        let result = registry.register(mv("ohlc_1s", vec!["trades"]));
426        assert!(matches!(result, Err(MvError::DuplicateName(_))));
427    }
428
429    #[test]
430    fn test_source_not_found_error() {
431        let mut registry = MvRegistry::new();
432
433        let result = registry.register(mv("view", vec!["nonexistent"]));
434        assert!(matches!(result, Err(MvError::SourceNotFound(_))));
435    }
436
437    #[test]
438    fn test_cycle_detection_direct() {
439        let mut registry = MvRegistry::new();
440        registry.register_base_table("a");
441
442        registry.register(mv("b", vec!["a"])).unwrap();
443        registry.register(mv("c", vec!["b"])).unwrap();
444
445        // Try to create c -> b -> c (cycle via new registration with c as source of c)
446        // Actually, we can't register "c" again because of DuplicateName
447        // Let's test a different cycle: d depends on c, then try to make c depend on d
448        registry.register(mv("d", vec!["c"])).unwrap();
449
450        // Can't make e depend on d and have c depend on e (would require modifying c)
451        // But we can test by trying to add a view that creates a cycle through existing views
452        // Actually this is the correct test: try to add x -> d, y -> x, and then a view that d -> y
453    }
454
455    #[test]
456    fn test_multi_source_view() {
457        let mut registry = MvRegistry::new();
458        registry.register_base_table("orders");
459        registry.register_base_table("payments");
460
461        // View that joins two base tables
462        registry
463            .register(mv("order_payments", vec!["orders", "payments"]))
464            .unwrap();
465
466        assert_eq!(registry.topo_order(), &["order_payments"]);
467
468        // Check dependencies
469        let deps: Vec<_> = registry.get_dependencies("order_payments").collect();
470        assert!(deps.contains(&"orders"));
471        assert!(deps.contains(&"payments"));
472    }
473
474    #[test]
475    fn test_diamond_dependency() {
476        let mut registry = MvRegistry::new();
477        registry.register_base_table("source");
478
479        //       source
480        //       /    \
481        //      a      b
482        //       \    /
483        //         c
484        registry.register(mv("a", vec!["source"])).unwrap();
485        registry.register(mv("b", vec!["source"])).unwrap();
486        registry.register(mv("c", vec!["a", "b"])).unwrap();
487
488        // c should come last
489        let order = registry.topo_order();
490        let c_idx = order.iter().position(|x| x == "c").unwrap();
491        let a_idx = order.iter().position(|x| x == "a").unwrap();
492        let b_idx = order.iter().position(|x| x == "b").unwrap();
493
494        assert!(c_idx > a_idx);
495        assert!(c_idx > b_idx);
496    }
497
498    #[test]
499    fn test_unregister_simple() {
500        let mut registry = MvRegistry::new();
501        registry.register_base_table("trades");
502        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
503
504        let removed = registry.unregister("ohlc_1s").unwrap();
505        assert_eq!(removed.name, "ohlc_1s");
506        assert!(registry.is_empty());
507    }
508
509    #[test]
510    fn test_unregister_with_dependents_error() {
511        let mut registry = MvRegistry::new();
512        registry.register_base_table("trades");
513        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
514        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
515
516        let result = registry.unregister("ohlc_1s");
517        assert!(matches!(result, Err(MvError::HasDependents(_, _))));
518    }
519
520    #[test]
521    fn test_unregister_cascade() {
522        let mut registry = MvRegistry::new();
523        registry.register_base_table("trades");
524        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
525        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
526        registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
527
528        let removed = registry.unregister_cascade("ohlc_1s").unwrap();
529
530        // All three should be removed
531        assert_eq!(removed.len(), 3);
532        assert!(registry.is_empty());
533
534        // Removed in reverse order (dependents first)
535        assert_eq!(removed[0].name, "ohlc_1h");
536        assert_eq!(removed[1].name, "ohlc_1m");
537        assert_eq!(removed[2].name, "ohlc_1s");
538    }
539
540    #[test]
541    fn test_dependency_chain() {
542        let mut registry = MvRegistry::new();
543        registry.register_base_table("trades");
544        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
545        registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
546        registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
547
548        let chain = registry.dependency_chain("ohlc_1h");
549        assert_eq!(chain, vec!["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
550    }
551
552    #[test]
553    fn test_get_dependents() {
554        let mut registry = MvRegistry::new();
555        registry.register_base_table("trades");
556        registry.register(mv("a", vec!["trades"])).unwrap();
557        registry.register(mv("b", vec!["trades"])).unwrap();
558        registry.register(mv("c", vec!["a"])).unwrap();
559
560        let dependents: Vec<_> = registry.get_dependents("trades").collect();
561        assert!(dependents.contains(&"a"));
562        assert!(dependents.contains(&"b"));
563        assert!(!dependents.contains(&"c"));
564
565        let a_dependents: Vec<_> = registry.get_dependents("a").collect();
566        assert_eq!(a_dependents, vec!["c"]);
567    }
568
569    #[test]
570    fn test_view_state_update() {
571        let mut registry = MvRegistry::new();
572        registry.register_base_table("trades");
573        registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
574
575        let view = registry.get_mut("ohlc_1s").unwrap();
576        assert_eq!(view.state, MvState::Running);
577
578        view.state = MvState::Dropping;
579        assert_eq!(view.state, MvState::Dropping);
580    }
581}