Skip to main content

laminar_core/dag/
topology.rs

1//! DAG topology data structures.
2//!
3//! Defines `DagNode`, `DagEdge`, and `StreamingDag` with topological ordering,
4//! cycle detection, and automatic channel type derivation.
5
6use std::collections::VecDeque;
7use std::fmt;
8use std::sync::Arc;
9
10use arrow_schema::SchemaRef;
11use rustc_hash::{FxHashMap, FxHashSet};
12use smallvec::SmallVec;
13
14use super::error::DagError;
15
16/// Maximum fan-out targets per node output port.
17///
18/// Matches the routing table entry size.
19pub const MAX_FAN_OUT: usize = 8;
20
21/// Unique identifier for a node in the DAG.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub struct NodeId(pub u32);
24
25impl fmt::Display for NodeId {
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        write!(f, "NodeId({})", self.0)
28    }
29}
30
31/// Unique identifier for an edge in the DAG.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub struct EdgeId(pub u32);
34
35impl fmt::Display for EdgeId {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        write!(f, "EdgeId({})", self.0)
38    }
39}
40
41/// Unique identifier for a state partition.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43pub struct StatePartitionId(pub u32);
44
45/// Classification of a DAG node for ring assignment.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum DagNodeType {
48    /// External data source (Connector SDK source, Ring 1).
49    Source,
50    /// Stateless operator (map, filter, project -- Ring 0).
51    StatelessOperator,
52    /// Stateful operator (window, join, aggregate -- Ring 0).
53    StatefulOperator,
54    /// Materialized view (wraps `MvPipelineExecutor` node -- Ring 0).
55    MaterializedView,
56    /// External data sink (Connector SDK sink, Ring 1).
57    Sink,
58}
59
60/// Channel type derived from DAG topology analysis.
61///
62/// Extends the existing `ChannelMode` from `streaming::channel`
63/// (which supports SPSC and MPSC) with SPMC for fan-out.
64/// Channel types are NEVER user-specified; they are computed from
65/// the fan-in and fan-out of connected nodes.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum DagChannelType {
68    /// Single producer, single consumer (optimal for linear segments).
69    Spsc,
70    /// Single producer, multiple consumers (fan-out / shared stage).
71    Spmc,
72    /// Multiple producers, single consumer (fan-in).
73    Mpsc,
74}
75
76/// Custom partitioning function type.
77pub type PartitionFn = Arc<dyn Fn(&[u8]) -> usize + Send + Sync>;
78
79/// Partitioning strategy for parallel operator instances.
80#[derive(Clone, Default)]
81pub enum PartitioningStrategy {
82    /// All events go to a single partition (no parallelism).
83    #[default]
84    Single,
85    /// Round-robin distribution across partitions.
86    RoundRobin,
87    /// Hash-based partitioning by key expression.
88    HashBy(String),
89    /// Custom partitioning function.
90    Custom(PartitionFn),
91}
92
93impl fmt::Debug for PartitioningStrategy {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        match self {
96            Self::Single => write!(f, "Single"),
97            Self::RoundRobin => write!(f, "RoundRobin"),
98            Self::HashBy(key) => write!(f, "HashBy({key})"),
99            Self::Custom(_) => write!(f, "Custom(...)"),
100        }
101    }
102}
103
104/// A node in the DAG represents an operator or stage.
105///
106/// Nodes are created during topology construction in Ring 2
107/// and are immutable once the DAG is finalized.
108pub struct DagNode {
109    /// Unique node identifier.
110    pub id: NodeId,
111    /// Human-readable name (e.g., "normalize", "vwap").
112    pub name: String,
113    /// Upstream connections (fan-in). `SmallVec` avoids heap alloc for <= 4 inputs.
114    pub inputs: SmallVec<[EdgeId; 4]>,
115    /// Downstream connections (fan-out). `SmallVec` avoids heap alloc for <= 4 outputs.
116    pub outputs: SmallVec<[EdgeId; 4]>,
117    /// Output schema for downstream type checking.
118    pub output_schema: SchemaRef,
119    /// State partition assignment (for thread-per-core routing).
120    pub state_partition: StatePartitionId,
121    /// Node classification for ring assignment.
122    pub node_type: DagNodeType,
123}
124
125impl fmt::Debug for DagNode {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_struct("DagNode")
128            .field("id", &self.id)
129            .field("name", &self.name)
130            .field("inputs", &self.inputs)
131            .field("outputs", &self.outputs)
132            .field("node_type", &self.node_type)
133            .field("state_partition", &self.state_partition)
134            .finish_non_exhaustive()
135    }
136}
137
138/// An edge represents a data flow connection between two nodes.
139///
140/// The channel type is automatically derived from the topology,
141/// consistent with the auto-upgrade pattern in `streaming::channel`.
142#[derive(Debug)]
143pub struct DagEdge {
144    /// Unique edge identifier.
145    pub id: EdgeId,
146    /// Source node.
147    pub source: NodeId,
148    /// Target node.
149    pub target: NodeId,
150    /// Channel type (derived from topology analysis, never user-specified).
151    pub channel_type: DagChannelType,
152    /// Partitioning strategy for parallel execution.
153    pub partitioning: PartitioningStrategy,
154    /// Output port on the source node (for multi-output operators).
155    pub source_port: u8,
156    /// Input port on the target node (for multi-input operators).
157    pub target_port: u8,
158}
159
160/// Metadata for a shared intermediate stage.
161///
162/// Created during topology finalization for any node with fan-out > 1.
163#[derive(Debug)]
164pub struct SharedStageMetadata {
165    /// Node that produces the shared output.
166    pub producer_node: NodeId,
167    /// Number of downstream consumers.
168    pub consumer_count: usize,
169    /// Consumer node IDs.
170    pub consumer_nodes: Vec<NodeId>,
171}
172
173/// The complete DAG topology.
174///
175/// Constructed in Ring 2 via `DagBuilder` or from SQL `CREATE MATERIALIZED VIEW`
176/// chains. Once built, the topology is immutable and can be executed in Ring 0.
177pub struct StreamingDag {
178    /// All nodes in the DAG, keyed by `NodeId`.
179    nodes: FxHashMap<NodeId, DagNode>,
180    /// All edges in the DAG, keyed by `EdgeId`.
181    edges: FxHashMap<EdgeId, DagEdge>,
182    /// Topologically sorted execution order (dependencies first).
183    /// Computed via Kahn's algorithm.
184    execution_order: Vec<NodeId>,
185    /// Shared intermediate stage metadata (nodes with fan-out > 1).
186    shared_stages: FxHashMap<NodeId, SharedStageMetadata>,
187    /// Source nodes (entry points, no inputs).
188    source_nodes: Vec<NodeId>,
189    /// Sink nodes (exit points, no outputs).
190    sink_nodes: Vec<NodeId>,
191    /// Name -> `NodeId` index for lookups.
192    name_index: FxHashMap<String, NodeId>,
193    /// Next node ID counter.
194    next_node_id: u32,
195    /// Next edge ID counter.
196    next_edge_id: u32,
197    /// Whether the DAG has been finalized.
198    finalized: bool,
199}
200
201impl fmt::Debug for StreamingDag {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        f.debug_struct("StreamingDag")
204            .field("node_count", &self.nodes.len())
205            .field("edge_count", &self.edges.len())
206            .field("source_nodes", &self.source_nodes)
207            .field("sink_nodes", &self.sink_nodes)
208            .field("execution_order", &self.execution_order)
209            .field("finalized", &self.finalized)
210            .finish_non_exhaustive()
211    }
212}
213
214impl StreamingDag {
215    /// Creates a new empty DAG.
216    #[must_use]
217    pub fn new() -> Self {
218        Self {
219            nodes: FxHashMap::default(),
220            edges: FxHashMap::default(),
221            execution_order: Vec::new(),
222            shared_stages: FxHashMap::default(),
223            source_nodes: Vec::new(),
224            sink_nodes: Vec::new(),
225            name_index: FxHashMap::default(),
226            next_node_id: 0,
227            next_edge_id: 0,
228            finalized: false,
229        }
230    }
231
232    /// Adds a node to the DAG.
233    ///
234    /// # Errors
235    ///
236    /// Returns `DagError::DuplicateNode` if a node with the same name exists.
237    pub fn add_node(
238        &mut self,
239        name: impl Into<String>,
240        node_type: DagNodeType,
241        output_schema: SchemaRef,
242    ) -> Result<NodeId, DagError> {
243        let name = name.into();
244        if self.name_index.contains_key(&name) {
245            return Err(DagError::DuplicateNode(name));
246        }
247
248        let id = NodeId(self.next_node_id);
249        self.next_node_id += 1;
250
251        let node = DagNode {
252            id,
253            name: name.clone(),
254            inputs: SmallVec::new(),
255            outputs: SmallVec::new(),
256            output_schema,
257            state_partition: StatePartitionId(id.0),
258            node_type,
259        };
260
261        self.nodes.insert(id, node);
262        self.name_index.insert(name, id);
263        self.finalized = false;
264
265        Ok(id)
266    }
267
268    /// Adds an edge between two nodes.
269    ///
270    /// # Errors
271    ///
272    /// Returns `DagError::NodeNotFound` if either node does not exist.
273    /// Returns `DagError::CycleDetected` if the edge would create a self-loop.
274    pub fn add_edge(&mut self, source: NodeId, target: NodeId) -> Result<EdgeId, DagError> {
275        // Self-loop check
276        if source == target {
277            let name = self.node_name(source).unwrap_or_default();
278            return Err(DagError::CycleDetected(name));
279        }
280
281        if !self.nodes.contains_key(&source) {
282            return Err(DagError::NodeNotFound(format!("{source}")));
283        }
284        if !self.nodes.contains_key(&target) {
285            return Err(DagError::NodeNotFound(format!("{target}")));
286        }
287
288        let id = EdgeId(self.next_edge_id);
289        self.next_edge_id += 1;
290
291        // Port indices are bounded by MAX_FAN_OUT (8), so truncation is safe.
292        #[allow(clippy::cast_possible_truncation)]
293        let source_port = self.nodes.get(&source).map_or(0, |n| n.outputs.len() as u8);
294        #[allow(clippy::cast_possible_truncation)]
295        let target_port = self.nodes.get(&target).map_or(0, |n| n.inputs.len() as u8);
296
297        let edge = DagEdge {
298            id,
299            source,
300            target,
301            channel_type: DagChannelType::Spsc, // Derived during finalize
302            partitioning: PartitioningStrategy::default(),
303            source_port,
304            target_port,
305        };
306
307        self.edges.insert(id, edge);
308
309        // Update node adjacency lists
310        if let Some(node) = self.nodes.get_mut(&source) {
311            node.outputs.push(id);
312        }
313        if let Some(node) = self.nodes.get_mut(&target) {
314            node.inputs.push(id);
315        }
316
317        self.finalized = false;
318
319        Ok(id)
320    }
321
322    /// Finalizes the DAG: validates topology, computes execution order,
323    /// derives channel types, and identifies shared stages.
324    ///
325    /// # Errors
326    ///
327    /// Returns `DagError::EmptyDag` if the DAG has no nodes.
328    /// Returns `DagError::CycleDetected` if the graph contains cycles.
329    /// Returns `DagError::DisconnectedNode` if a non-source, non-sink node
330    /// has no inputs or no outputs.
331    /// Returns `DagError::FanOutLimitExceeded` if any node exceeds the
332    /// maximum fan-out limit.
333    pub fn finalize(&mut self) -> Result<(), DagError> {
334        if self.nodes.is_empty() {
335            return Err(DagError::EmptyDag);
336        }
337
338        self.check_fan_out_limits()?;
339        self.compute_execution_order()?;
340        self.check_connected()?;
341        self.derive_channel_types();
342        self.identify_shared_stages();
343        self.classify_source_sink_nodes();
344        self.finalized = true;
345
346        Ok(())
347    }
348
349    /// Validates the DAG topology without modifying internal state.
350    ///
351    /// # Errors
352    ///
353    /// Returns errors for cycles, disconnected nodes, or schema mismatches.
354    pub fn validate(&self) -> Result<(), DagError> {
355        if self.nodes.is_empty() {
356            return Err(DagError::EmptyDag);
357        }
358        self.check_fan_out_limits()?;
359        self.check_acyclic()?;
360        self.check_connected()?;
361        self.check_schemas()?;
362        Ok(())
363    }
364
365    // ---- Accessors ----
366
367    /// Returns the number of nodes in the DAG.
368    #[must_use]
369    pub fn node_count(&self) -> usize {
370        self.nodes.len()
371    }
372
373    /// Returns the number of edges in the DAG.
374    #[must_use]
375    pub fn edge_count(&self) -> usize {
376        self.edges.len()
377    }
378
379    /// Returns a reference to a node by ID.
380    #[must_use]
381    pub fn node(&self, id: NodeId) -> Option<&DagNode> {
382        self.nodes.get(&id)
383    }
384
385    /// Returns a reference to an edge by ID.
386    #[must_use]
387    pub fn edge(&self, id: EdgeId) -> Option<&DagEdge> {
388        self.edges.get(&id)
389    }
390
391    /// Returns all nodes.
392    #[must_use]
393    pub fn nodes(&self) -> &FxHashMap<NodeId, DagNode> {
394        &self.nodes
395    }
396
397    /// Returns all edges.
398    #[must_use]
399    pub fn edges(&self) -> &FxHashMap<EdgeId, DagEdge> {
400        &self.edges
401    }
402
403    /// Returns the `NodeId` for a given node name.
404    #[must_use]
405    pub fn node_id_by_name(&self, name: &str) -> Option<NodeId> {
406        self.name_index.get(name).copied()
407    }
408
409    /// Returns the node name for a given `NodeId`.
410    #[must_use]
411    pub fn node_name(&self, id: NodeId) -> Option<String> {
412        self.nodes.get(&id).map(|n| n.name.clone())
413    }
414
415    /// Returns the number of outgoing edges from a node.
416    #[inline]
417    #[must_use]
418    pub fn outgoing_edge_count(&self, node: NodeId) -> usize {
419        self.nodes.get(&node).map_or(0, |n| n.outputs.len())
420    }
421
422    /// Returns the number of incoming edges to a node.
423    #[inline]
424    #[must_use]
425    pub fn incoming_edge_count(&self, node: NodeId) -> usize {
426        self.nodes.get(&node).map_or(0, |n| n.inputs.len())
427    }
428
429    /// Returns all source nodes (nodes with no inputs).
430    #[must_use]
431    pub fn sources(&self) -> &[NodeId] {
432        &self.source_nodes
433    }
434
435    /// Returns all sink nodes (nodes with no outputs).
436    #[must_use]
437    pub fn sinks(&self) -> &[NodeId] {
438        &self.sink_nodes
439    }
440
441    /// Returns nodes in topological execution order (dependencies first).
442    #[must_use]
443    pub fn execution_order(&self) -> &[NodeId] {
444        &self.execution_order
445    }
446
447    /// Returns metadata for shared stages (nodes with fan-out > 1).
448    #[must_use]
449    pub fn shared_stages(&self) -> &FxHashMap<NodeId, SharedStageMetadata> {
450        &self.shared_stages
451    }
452
453    /// Returns whether the DAG has been finalized.
454    #[must_use]
455    pub fn is_finalized(&self) -> bool {
456        self.finalized
457    }
458
459    // ---- Internal validation methods ----
460
461    /// Checks that no node exceeds the maximum fan-out limit.
462    fn check_fan_out_limits(&self) -> Result<(), DagError> {
463        for node in self.nodes.values() {
464            if node.outputs.len() > MAX_FAN_OUT {
465                return Err(DagError::FanOutLimitExceeded {
466                    node: node.name.clone(),
467                    count: node.outputs.len(),
468                    max: MAX_FAN_OUT,
469                });
470            }
471        }
472        Ok(())
473    }
474
475    /// Checks that the graph is acyclic using Kahn's algorithm.
476    ///
477    /// If the number of nodes in the topological order is less than
478    /// the total number of nodes, a cycle exists.
479    fn check_acyclic(&self) -> Result<(), DagError> {
480        let (order, _) = self.kahn_topo_sort();
481        if order.len() < self.nodes.len() {
482            // Find a node involved in the cycle (not in the order)
483            let ordered_set: FxHashSet<NodeId> = order.into_iter().collect();
484            for node in self.nodes.values() {
485                if !ordered_set.contains(&node.id) {
486                    return Err(DagError::CycleDetected(node.name.clone()));
487                }
488            }
489            return Err(DagError::CycleDetected("unknown".to_string()));
490        }
491        Ok(())
492    }
493
494    /// Checks that all non-source, non-sink nodes have both inputs and outputs.
495    fn check_connected(&self) -> Result<(), DagError> {
496        for node in self.nodes.values() {
497            match node.node_type {
498                DagNodeType::Source => {
499                    if node.outputs.is_empty() {
500                        return Err(DagError::DisconnectedNode(node.name.clone()));
501                    }
502                }
503                DagNodeType::Sink => {
504                    if node.inputs.is_empty() {
505                        return Err(DagError::DisconnectedNode(node.name.clone()));
506                    }
507                }
508                _ => {
509                    if node.inputs.is_empty() && node.outputs.is_empty() {
510                        return Err(DagError::DisconnectedNode(node.name.clone()));
511                    }
512                }
513            }
514        }
515        Ok(())
516    }
517
518    /// Validates schema compatibility for connected edges.
519    ///
520    /// Source node schema must be compatible with the target node.
521    /// For now, checks that field count and types match.
522    fn check_schemas(&self) -> Result<(), DagError> {
523        for edge in self.edges.values() {
524            let source_node = self.nodes.get(&edge.source);
525            let target_node = self.nodes.get(&edge.target);
526
527            if let (Some(source), Some(target)) = (source_node, target_node) {
528                let source_schema = &source.output_schema;
529                let target_schema = &target.output_schema;
530
531                // Empty schemas are compatible with anything (type-erased).
532                if source_schema.fields().is_empty() || target_schema.fields().is_empty() {
533                    continue;
534                }
535
536                // Check field count and types
537                if source_schema.fields().len() != target_schema.fields().len() {
538                    return Err(DagError::SchemaMismatch {
539                        source_node: source.name.clone(),
540                        target_node: target.name.clone(),
541                        reason: format!(
542                            "field count mismatch: {} vs {}",
543                            source_schema.fields().len(),
544                            target_schema.fields().len()
545                        ),
546                    });
547                }
548
549                for (sf, tf) in source_schema
550                    .fields()
551                    .iter()
552                    .zip(target_schema.fields().iter())
553                {
554                    if sf.data_type() != tf.data_type() {
555                        return Err(DagError::SchemaMismatch {
556                            source_node: source.name.clone(),
557                            target_node: target.name.clone(),
558                            reason: format!(
559                                "type mismatch for field '{}': {:?} vs '{}':{:?}",
560                                sf.name(),
561                                sf.data_type(),
562                                tf.name(),
563                                tf.data_type()
564                            ),
565                        });
566                    }
567                }
568            }
569        }
570        Ok(())
571    }
572
573    /// Computes topological execution order using Kahn's algorithm.
574    ///
575    /// Also detects cycles: if the resulting order has fewer nodes
576    /// than the DAG, a cycle exists.
577    fn compute_execution_order(&mut self) -> Result<(), DagError> {
578        let (order, processed) = self.kahn_topo_sort();
579        if processed < self.nodes.len() {
580            let ordered_set: FxHashSet<NodeId> = order.iter().copied().collect();
581            for node in self.nodes.values() {
582                if !ordered_set.contains(&node.id) {
583                    return Err(DagError::CycleDetected(node.name.clone()));
584                }
585            }
586            return Err(DagError::CycleDetected("unknown".to_string()));
587        }
588        self.execution_order = order;
589        Ok(())
590    }
591
592    /// Kahn's algorithm for topological sort.
593    ///
594    /// Returns `(ordered_node_ids, count_of_processed_nodes)`.
595    fn kahn_topo_sort(&self) -> (Vec<NodeId>, usize) {
596        // Compute in-degree for each node
597        let mut in_degree: FxHashMap<NodeId, usize> = FxHashMap::default();
598        for node in self.nodes.values() {
599            in_degree.entry(node.id).or_insert(0);
600        }
601        for edge in self.edges.values() {
602            *in_degree.entry(edge.target).or_insert(0) += 1;
603        }
604
605        // Start with all nodes that have in-degree 0
606        let mut queue: VecDeque<NodeId> = VecDeque::new();
607        for (&node_id, &deg) in &in_degree {
608            if deg == 0 {
609                queue.push_back(node_id);
610            }
611        }
612
613        // Sort initial queue by NodeId for deterministic ordering
614        let mut initial: Vec<NodeId> = queue.drain(..).collect();
615        initial.sort_by_key(|n| n.0);
616        for id in initial {
617            queue.push_back(id);
618        }
619
620        let mut order = Vec::with_capacity(self.nodes.len());
621        let mut processed = 0;
622
623        while let Some(node_id) = queue.pop_front() {
624            order.push(node_id);
625            processed += 1;
626
627            // Collect and sort successors for deterministic ordering
628            if let Some(node) = self.nodes.get(&node_id) {
629                let mut successors: Vec<NodeId> = Vec::new();
630                for &edge_id in &node.outputs {
631                    if let Some(edge) = self.edges.get(&edge_id) {
632                        let target = edge.target;
633                        if let Some(deg) = in_degree.get_mut(&target) {
634                            *deg = deg.saturating_sub(1);
635                            if *deg == 0 {
636                                successors.push(target);
637                            }
638                        }
639                    }
640                }
641                successors.sort_by_key(|n| n.0);
642                queue.extend(successors);
643            }
644        }
645
646        (order, processed)
647    }
648
649    /// Derives channel types from the topology structure.
650    ///
651    /// - SPSC: source has 1 output, target has 1 input
652    /// - SPMC: source has >1 outputs (fan-out)
653    /// - MPSC: target has >1 inputs (fan-in)
654    fn derive_channel_types(&mut self) {
655        let edge_ids: Vec<EdgeId> = self.edges.keys().copied().collect();
656
657        for edge_id in edge_ids {
658            let (source_fan_out, target_fan_in) = {
659                let edge = &self.edges[&edge_id];
660                (
661                    self.outgoing_edge_count(edge.source),
662                    self.incoming_edge_count(edge.target),
663                )
664            };
665
666            let channel_type = match (target_fan_in > 1, source_fan_out > 1) {
667                (false, false) => DagChannelType::Spsc,
668                (false, true) => DagChannelType::Spmc,
669                // MPSC when target has multiple inputs (fan-in), regardless of
670                // whether source also fans out. Each edge is MPSC from the
671                // target's perspective.
672                (true, _) => DagChannelType::Mpsc,
673            };
674
675            if let Some(edge) = self.edges.get_mut(&edge_id) {
676                edge.channel_type = channel_type;
677            }
678        }
679    }
680
681    /// Identifies shared intermediate stages (nodes with fan-out > 1).
682    fn identify_shared_stages(&mut self) {
683        self.shared_stages.clear();
684
685        for node in self.nodes.values() {
686            if node.outputs.len() > 1 {
687                let consumer_nodes: Vec<NodeId> = node
688                    .outputs
689                    .iter()
690                    .filter_map(|&edge_id| self.edges.get(&edge_id).map(|e| e.target))
691                    .collect();
692
693                self.shared_stages.insert(
694                    node.id,
695                    SharedStageMetadata {
696                        producer_node: node.id,
697                        consumer_count: consumer_nodes.len(),
698                        consumer_nodes,
699                    },
700                );
701            }
702        }
703    }
704
705    /// Classifies source and sink nodes based on connectivity.
706    fn classify_source_sink_nodes(&mut self) {
707        self.source_nodes.clear();
708        self.sink_nodes.clear();
709
710        for node in self.nodes.values() {
711            if node.inputs.is_empty() {
712                self.source_nodes.push(node.id);
713            }
714            if node.outputs.is_empty() {
715                self.sink_nodes.push(node.id);
716            }
717        }
718
719        // Sort for deterministic ordering
720        self.source_nodes.sort_by_key(|n| n.0);
721        self.sink_nodes.sort_by_key(|n| n.0);
722    }
723}
724
725impl StreamingDag {
726    /// Constructs a DAG from an `MvRegistry` and base table schemas.
727    ///
728    /// Creates a Source node for each base table and a `MaterializedView` node
729    /// for each MV, then connects them according to the registry's dependency
730    /// graph. The resulting DAG is finalized and ready for execution.
731    ///
732    /// # Arguments
733    ///
734    /// * `registry` - The MV registry with dependency information
735    /// * `base_table_schemas` - Schemas for each base table referenced by MVs
736    ///
737    /// # Errors
738    ///
739    /// Returns `DagError::EmptyDag` if the registry is empty and has no base tables.
740    /// Returns `DagError::BaseTableSchemaNotFound` if a base table schema is missing.
741    pub fn from_mv_registry(
742        registry: &crate::mv::MvRegistry,
743        base_table_schemas: &FxHashMap<String, SchemaRef>,
744    ) -> Result<Self, DagError> {
745        if registry.is_empty() && registry.base_tables().is_empty() {
746            return Err(DagError::EmptyDag);
747        }
748
749        let mut dag = Self::new();
750
751        // Add source nodes for base tables that are actually referenced by MVs
752        for base_table in registry.base_tables() {
753            let schema = base_table_schemas
754                .get(base_table)
755                .ok_or_else(|| DagError::BaseTableSchemaNotFound(base_table.clone()))?;
756            dag.add_node(base_table, DagNodeType::Source, schema.clone())?;
757        }
758
759        // Add MV nodes in topological order
760        for mv_name in registry.topo_order() {
761            let mv = registry
762                .get(mv_name)
763                .ok_or_else(|| DagError::NodeNotFound(mv_name.clone()))?;
764            dag.add_node(mv_name, DagNodeType::MaterializedView, mv.schema.clone())?;
765        }
766
767        // Connect edges: for each MV, connect its sources to it
768        for mv_name in registry.topo_order() {
769            let mv = registry
770                .get(mv_name)
771                .ok_or_else(|| DagError::NodeNotFound(mv_name.clone()))?;
772            let target_id = dag
773                .node_id_by_name(mv_name)
774                .ok_or_else(|| DagError::NodeNotFound(mv_name.clone()))?;
775            for source_name in &mv.sources {
776                let source_id = dag
777                    .node_id_by_name(source_name)
778                    .ok_or_else(|| DagError::NodeNotFound(source_name.clone()))?;
779                dag.add_edge(source_id, target_id)?;
780            }
781        }
782
783        dag.finalize()?;
784        Ok(dag)
785    }
786}
787
788impl Default for StreamingDag {
789    fn default() -> Self {
790        Self::new()
791    }
792}