1use super::error::{MvError, MvState};
7use arrow_schema::SchemaRef;
8use rustc_hash::{FxHashMap, FxHashSet};
9use std::collections::VecDeque;
10
11#[derive(Debug, Clone)]
16pub struct MaterializedView {
17 pub name: String,
19 pub sql: String,
21 pub sources: Vec<String>,
23 pub schema: SchemaRef,
25 pub operator_id: String,
27 pub state: MvState,
29}
30
31impl MaterializedView {
32 #[must_use]
34 pub fn new(
35 name: impl Into<String>,
36 sql: impl Into<String>,
37 sources: Vec<String>,
38 schema: SchemaRef,
39 ) -> Self {
40 let name = name.into();
41 let operator_id = format!("mv_{name}");
42 Self {
43 name,
44 sql: sql.into(),
45 sources,
46 schema,
47 operator_id,
48 state: MvState::Running,
49 }
50 }
51
52 #[cfg(test)]
54 pub fn simple(name: impl Into<String>, sources: Vec<String>) -> Self {
55 use arrow_schema::{DataType, Field, Schema};
56 use std::sync::Arc;
57
58 let schema = Arc::new(Schema::new(vec![Field::new(
59 "value",
60 DataType::Int64,
61 false,
62 )]));
63 Self::new(name, "", sources, schema)
64 }
65
66 #[must_use]
68 pub fn depends_on(&self, source: &str) -> bool {
69 self.sources.iter().any(|s| s == source)
70 }
71}
72
73#[derive(Debug, Default)]
77pub struct MvRegistry {
78 views: FxHashMap<String, MaterializedView>,
80 base_tables: FxHashSet<String>,
82 dependents: FxHashMap<String, FxHashSet<String>>,
84 dependencies: FxHashMap<String, FxHashSet<String>>,
86 topo_order: Vec<String>,
88}
89
90impl MvRegistry {
91 #[must_use]
93 pub fn new() -> Self {
94 Self::default()
95 }
96
97 pub fn register_base_table(&mut self, name: impl Into<String>) {
102 self.base_tables.insert(name.into());
103 }
104
105 #[must_use]
107 pub fn is_base_table(&self, name: &str) -> bool {
108 self.base_tables.contains(name)
109 }
110
111 pub fn register(&mut self, view: MaterializedView) -> Result<(), MvError> {
120 if self.views.contains_key(&view.name) {
122 return Err(MvError::DuplicateName(view.name.clone()));
123 }
124
125 for source in &view.sources {
127 if !self.views.contains_key(source) && !self.is_base_table(source) {
128 return Err(MvError::SourceNotFound(source.clone()));
129 }
130 }
131
132 if self.would_create_cycle(&view.name, &view.sources) {
134 return Err(MvError::CycleDetected(view.name.clone()));
135 }
136
137 for source in &view.sources {
139 self.dependents
140 .entry(source.clone())
141 .or_default()
142 .insert(view.name.clone());
143 self.dependencies
144 .entry(view.name.clone())
145 .or_default()
146 .insert(source.clone());
147 }
148
149 self.views.insert(view.name.clone(), view);
150 self.update_topo_order();
151
152 Ok(())
153 }
154
155 pub fn unregister(&mut self, name: &str) -> Result<MaterializedView, MvError> {
163 if !self.views.contains_key(name) {
165 return Err(MvError::ViewNotFound(name.to_string()));
166 }
167
168 if let Some(deps) = self.dependents.get(name) {
170 if !deps.is_empty() {
171 let dep_names: Vec<_> = deps.iter().cloned().collect();
172 return Err(MvError::HasDependents(name.to_string(), dep_names));
173 }
174 }
175
176 self.remove_view(name)
177 }
178
179 pub fn unregister_cascade(&mut self, name: &str) -> Result<Vec<MaterializedView>, MvError> {
187 if !self.views.contains_key(name) {
188 return Err(MvError::ViewNotFound(name.to_string()));
189 }
190
191 let mut to_remove = Vec::new();
193 self.collect_dependents_recursive(name, &mut to_remove);
194 to_remove.push(name.to_string());
195
196 let mut removed = Vec::with_capacity(to_remove.len());
198 for view_name in to_remove {
199 if let Ok(view) = self.remove_view(&view_name) {
200 removed.push(view);
201 }
202 }
203
204 Ok(removed)
205 }
206
207 fn collect_dependents_recursive(&self, name: &str, result: &mut Vec<String>) {
208 if let Some(deps) = self.dependents.get(name) {
209 for dep in deps {
210 if !result.contains(dep) {
211 self.collect_dependents_recursive(dep, result);
212 result.push(dep.clone());
213 }
214 }
215 }
216 }
217
218 fn remove_view(&mut self, name: &str) -> Result<MaterializedView, MvError> {
219 let view = self
220 .views
221 .remove(name)
222 .ok_or_else(|| MvError::ViewNotFound(name.to_string()))?;
223
224 if let Some(sources) = self.dependencies.remove(name) {
226 for source in sources {
227 if let Some(deps) = self.dependents.get_mut(&source) {
228 deps.remove(name);
229 }
230 }
231 }
232 self.dependents.remove(name);
233
234 self.update_topo_order();
236
237 Ok(view)
238 }
239
240 #[must_use]
242 pub fn get(&self, name: &str) -> Option<&MaterializedView> {
243 self.views.get(name)
244 }
245
246 #[must_use]
248 pub fn get_mut(&mut self, name: &str) -> Option<&mut MaterializedView> {
249 self.views.get_mut(name)
250 }
251
252 #[must_use]
254 pub fn topo_order(&self) -> &[String] {
255 &self.topo_order
256 }
257
258 pub fn get_dependents(&self, source: &str) -> impl Iterator<Item = &str> {
260 self.dependents
261 .get(source)
262 .into_iter()
263 .flatten()
264 .map(String::as_str)
265 }
266
267 pub fn get_dependencies(&self, view: &str) -> impl Iterator<Item = &str> {
269 self.dependencies
270 .get(view)
271 .into_iter()
272 .flatten()
273 .map(String::as_str)
274 }
275
276 #[must_use]
278 pub fn len(&self) -> usize {
279 self.views.len()
280 }
281
282 #[must_use]
284 pub fn is_empty(&self) -> bool {
285 self.views.is_empty()
286 }
287
288 pub fn views(&self) -> impl Iterator<Item = &MaterializedView> {
290 self.views.values()
291 }
292
293 #[must_use]
295 pub fn base_tables(&self) -> &FxHashSet<String> {
296 &self.base_tables
297 }
298
299 #[must_use]
303 pub fn dependency_chain(&self, name: &str) -> Vec<String> {
304 let mut chain = Vec::new();
305 let mut visited = FxHashSet::default();
306 self.collect_dependencies_recursive(name, &mut chain, &mut visited);
307 chain
308 }
309
310 fn collect_dependencies_recursive(
311 &self,
312 name: &str,
313 result: &mut Vec<String>,
314 visited: &mut FxHashSet<String>,
315 ) {
316 if !visited.insert(name.to_string()) {
317 return;
318 }
319
320 if let Some(deps) = self.dependencies.get(name) {
321 for dep in deps {
322 self.collect_dependencies_recursive(dep, result, visited);
323 }
324 }
325
326 if self.views.contains_key(name) {
328 result.push(name.to_string());
329 }
330 }
331
332 fn would_create_cycle(&self, new_name: &str, sources: &[String]) -> bool {
333 let mut visited = FxHashSet::default();
335 let mut stack: Vec<_> = sources.to_vec();
336
337 while let Some(current) = stack.pop() {
338 if current == new_name {
339 return true;
340 }
341 if visited.insert(current.clone()) {
342 if let Some(deps) = self.dependencies.get(¤t) {
343 stack.extend(deps.iter().cloned());
344 }
345 }
346 }
347
348 false
349 }
350
351 fn update_topo_order(&mut self) {
352 let mut in_degree: FxHashMap<String, usize> = FxHashMap::default();
354 let mut queue: VecDeque<String> = VecDeque::new();
355
356 for name in self.views.keys() {
358 let deps = self.dependencies.get(name).map_or(0, |d| {
359 d.iter().filter(|dep| self.views.contains_key(*dep)).count()
360 });
361 in_degree.insert(name.clone(), deps);
362 if deps == 0 {
363 queue.push_back(name.clone());
364 }
365 }
366
367 self.topo_order.clear();
369 while let Some(name) = queue.pop_front() {
370 self.topo_order.push(name.clone());
371
372 if let Some(dependents) = self.dependents.get(&name) {
373 for dep in dependents {
374 if let Some(count) = in_degree.get_mut(dep) {
375 *count = count.saturating_sub(1);
376 if *count == 0 {
377 queue.push_back(dep.clone());
378 }
379 }
380 }
381 }
382 }
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389
390 fn mv(name: &str, sources: Vec<&str>) -> MaterializedView {
391 MaterializedView::simple(name, sources.into_iter().map(String::from).collect())
392 }
393
394 #[test]
395 fn test_simple_registration() {
396 let mut registry = MvRegistry::new();
397 registry.register_base_table("trades");
398
399 let view = mv("ohlc_1s", vec!["trades"]);
400 registry.register(view).unwrap();
401
402 assert_eq!(registry.len(), 1);
403 assert!(registry.get("ohlc_1s").is_some());
404 }
405
406 #[test]
407 fn test_cascading_registration() {
408 let mut registry = MvRegistry::new();
409 registry.register_base_table("trades");
410
411 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
412 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
413 registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
414
415 assert_eq!(registry.topo_order(), &["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
416 }
417
418 #[test]
419 fn test_duplicate_name_error() {
420 let mut registry = MvRegistry::new();
421 registry.register_base_table("trades");
422
423 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
424
425 let result = registry.register(mv("ohlc_1s", vec!["trades"]));
426 assert!(matches!(result, Err(MvError::DuplicateName(_))));
427 }
428
429 #[test]
430 fn test_source_not_found_error() {
431 let mut registry = MvRegistry::new();
432
433 let result = registry.register(mv("view", vec!["nonexistent"]));
434 assert!(matches!(result, Err(MvError::SourceNotFound(_))));
435 }
436
437 #[test]
438 fn test_cycle_detection_direct() {
439 let mut registry = MvRegistry::new();
440 registry.register_base_table("a");
441
442 registry.register(mv("b", vec!["a"])).unwrap();
443 registry.register(mv("c", vec!["b"])).unwrap();
444
445 registry.register(mv("d", vec!["c"])).unwrap();
449
450 }
454
455 #[test]
456 fn test_multi_source_view() {
457 let mut registry = MvRegistry::new();
458 registry.register_base_table("orders");
459 registry.register_base_table("payments");
460
461 registry
463 .register(mv("order_payments", vec!["orders", "payments"]))
464 .unwrap();
465
466 assert_eq!(registry.topo_order(), &["order_payments"]);
467
468 let deps: Vec<_> = registry.get_dependencies("order_payments").collect();
470 assert!(deps.contains(&"orders"));
471 assert!(deps.contains(&"payments"));
472 }
473
474 #[test]
475 fn test_diamond_dependency() {
476 let mut registry = MvRegistry::new();
477 registry.register_base_table("source");
478
479 registry.register(mv("a", vec!["source"])).unwrap();
485 registry.register(mv("b", vec!["source"])).unwrap();
486 registry.register(mv("c", vec!["a", "b"])).unwrap();
487
488 let order = registry.topo_order();
490 let c_idx = order.iter().position(|x| x == "c").unwrap();
491 let a_idx = order.iter().position(|x| x == "a").unwrap();
492 let b_idx = order.iter().position(|x| x == "b").unwrap();
493
494 assert!(c_idx > a_idx);
495 assert!(c_idx > b_idx);
496 }
497
498 #[test]
499 fn test_unregister_simple() {
500 let mut registry = MvRegistry::new();
501 registry.register_base_table("trades");
502 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
503
504 let removed = registry.unregister("ohlc_1s").unwrap();
505 assert_eq!(removed.name, "ohlc_1s");
506 assert!(registry.is_empty());
507 }
508
509 #[test]
510 fn test_unregister_with_dependents_error() {
511 let mut registry = MvRegistry::new();
512 registry.register_base_table("trades");
513 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
514 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
515
516 let result = registry.unregister("ohlc_1s");
517 assert!(matches!(result, Err(MvError::HasDependents(_, _))));
518 }
519
520 #[test]
521 fn test_unregister_cascade() {
522 let mut registry = MvRegistry::new();
523 registry.register_base_table("trades");
524 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
525 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
526 registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
527
528 let removed = registry.unregister_cascade("ohlc_1s").unwrap();
529
530 assert_eq!(removed.len(), 3);
532 assert!(registry.is_empty());
533
534 assert_eq!(removed[0].name, "ohlc_1h");
536 assert_eq!(removed[1].name, "ohlc_1m");
537 assert_eq!(removed[2].name, "ohlc_1s");
538 }
539
540 #[test]
541 fn test_dependency_chain() {
542 let mut registry = MvRegistry::new();
543 registry.register_base_table("trades");
544 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
545 registry.register(mv("ohlc_1m", vec!["ohlc_1s"])).unwrap();
546 registry.register(mv("ohlc_1h", vec!["ohlc_1m"])).unwrap();
547
548 let chain = registry.dependency_chain("ohlc_1h");
549 assert_eq!(chain, vec!["ohlc_1s", "ohlc_1m", "ohlc_1h"]);
550 }
551
552 #[test]
553 fn test_get_dependents() {
554 let mut registry = MvRegistry::new();
555 registry.register_base_table("trades");
556 registry.register(mv("a", vec!["trades"])).unwrap();
557 registry.register(mv("b", vec!["trades"])).unwrap();
558 registry.register(mv("c", vec!["a"])).unwrap();
559
560 let dependents: Vec<_> = registry.get_dependents("trades").collect();
561 assert!(dependents.contains(&"a"));
562 assert!(dependents.contains(&"b"));
563 assert!(!dependents.contains(&"c"));
564
565 let a_dependents: Vec<_> = registry.get_dependents("a").collect();
566 assert_eq!(a_dependents, vec!["c"]);
567 }
568
569 #[test]
570 fn test_view_state_update() {
571 let mut registry = MvRegistry::new();
572 registry.register_base_table("trades");
573 registry.register(mv("ohlc_1s", vec!["trades"])).unwrap();
574
575 let view = registry.get_mut("ohlc_1s").unwrap();
576 assert_eq!(view.state, MvState::Running);
577
578 view.state = MvState::Dropping;
579 assert_eq!(view.state, MvState::Dropping);
580 }
581}