Skip to main content

laminar_core/dag/
builder.rs

1//! DAG builder API for programmatic topology construction.
2//!
3//! Provides `DagBuilder` for fluent DAG construction in Ring 2 and
4//! `FanOutBuilder` for creating fan-out branches from shared stages.
5
6use arrow_schema::SchemaRef;
7use rustc_hash::FxHashMap;
8
9use super::error::DagError;
10use super::topology::{DagNodeType, StreamingDag};
11
12/// Fluent builder for constructing `StreamingDag` topologies.
13///
14/// Constructed in Ring 2. Once `build()` is called, the topology is
15/// immutable and can be executed in Ring 0.
16///
17/// # Example
18///
19/// ```rust,ignore
20/// use laminar_core::dag::DagBuilder;
21///
22/// let dag = DagBuilder::new()
23///     .source("trades", schema.clone())
24///     .operator("dedup", schema.clone())
25///     .operator("normalize", schema.clone())
26///     .connect("trades", "dedup")
27///     .connect("dedup", "normalize")
28///     .fan_out("normalize", |b| {
29///         b.branch("vwap", schema.clone())
30///          .branch("anomaly", schema.clone())
31///          .branch("position", schema.clone())
32///     })
33///     .sink_for("vwap", "analytics_sink", schema.clone())
34///     .sink_for("anomaly", "alert_sink", schema.clone())
35///     .sink_for("position", "risk_sink", schema.clone())
36///     .build()?;
37/// ```
38pub struct DagBuilder {
39    /// Nodes being constructed.
40    nodes: Vec<(String, DagNodeType, SchemaRef)>,
41    /// Edges being constructed.
42    edges: Vec<(String, String)>,
43    /// Name -> index mapping for duplicate detection.
44    name_index: FxHashMap<String, usize>,
45}
46
47impl DagBuilder {
48    /// Creates a new DAG builder.
49    #[must_use]
50    pub fn new() -> Self {
51        Self {
52            nodes: Vec::new(),
53            edges: Vec::new(),
54            name_index: FxHashMap::default(),
55        }
56    }
57
58    /// Adds a source node to the DAG.
59    ///
60    /// Source nodes are entry points that receive data from external
61    /// systems via the Connector SDK.
62    #[must_use]
63    pub fn source(mut self, name: &str, schema: SchemaRef) -> Self {
64        let idx = self.nodes.len();
65        self.nodes
66            .push((name.to_string(), DagNodeType::Source, schema));
67        self.name_index.insert(name.to_string(), idx);
68        self
69    }
70
71    /// Adds a stateful operator node to the DAG.
72    #[must_use]
73    pub fn operator(mut self, name: &str, schema: SchemaRef) -> Self {
74        let idx = self.nodes.len();
75        self.nodes
76            .push((name.to_string(), DagNodeType::StatefulOperator, schema));
77        self.name_index.insert(name.to_string(), idx);
78        self
79    }
80
81    /// Adds a stateless operator node to the DAG.
82    #[must_use]
83    pub fn stateless_operator(mut self, name: &str, schema: SchemaRef) -> Self {
84        let idx = self.nodes.len();
85        self.nodes
86            .push((name.to_string(), DagNodeType::StatelessOperator, schema));
87        self.name_index.insert(name.to_string(), idx);
88        self
89    }
90
91    /// Adds a materialized view node to the DAG.
92    #[must_use]
93    pub fn materialized_view(mut self, name: &str, schema: SchemaRef) -> Self {
94        let idx = self.nodes.len();
95        self.nodes
96            .push((name.to_string(), DagNodeType::MaterializedView, schema));
97        self.name_index.insert(name.to_string(), idx);
98        self
99    }
100
101    /// Connects two nodes with an edge.
102    #[must_use]
103    pub fn connect(mut self, from: &str, to: &str) -> Self {
104        self.edges.push((from.to_string(), to.to_string()));
105        self
106    }
107
108    /// Creates a fan-out from a shared stage to multiple branches.
109    ///
110    /// The `branches` closure receives a `FanOutBuilder` that allows
111    /// adding multiple downstream branches from the shared node.
112    /// Each branch is automatically connected to the shared node.
113    #[must_use]
114    pub fn fan_out<F>(mut self, shared_node: &str, branches: F) -> Self
115    where
116        F: FnOnce(FanOutBuilder) -> FanOutBuilder,
117    {
118        let fan_out = branches(FanOutBuilder::new());
119        for (branch_name, branch_type, branch_schema) in fan_out.branches {
120            let idx = self.nodes.len();
121            self.nodes
122                .push((branch_name.clone(), branch_type, branch_schema));
123            self.name_index.insert(branch_name.clone(), idx);
124            self.edges.push((shared_node.to_string(), branch_name));
125        }
126        self
127    }
128
129    /// Adds a sink node and connects it to an upstream node.
130    #[must_use]
131    pub fn sink_for(mut self, upstream: &str, sink_name: &str, schema: SchemaRef) -> Self {
132        let idx = self.nodes.len();
133        self.nodes
134            .push((sink_name.to_string(), DagNodeType::Sink, schema));
135        self.name_index.insert(sink_name.to_string(), idx);
136        self.edges
137            .push((upstream.to_string(), sink_name.to_string()));
138        self
139    }
140
141    /// Adds a sink node to the DAG (without auto-connecting).
142    ///
143    /// Use `connect()` to manually wire upstream nodes.
144    #[must_use]
145    pub fn sink(mut self, name: &str, schema: SchemaRef) -> Self {
146        let idx = self.nodes.len();
147        self.nodes
148            .push((name.to_string(), DagNodeType::Sink, schema));
149        self.name_index.insert(name.to_string(), idx);
150        self
151    }
152
153    /// Builds the immutable DAG topology.
154    ///
155    /// Validates the topology (acyclic, connected, schema-compatible),
156    /// computes topological order, derives channel types, and identifies
157    /// shared stages.
158    ///
159    /// # Errors
160    ///
161    /// Returns `DagError::CycleDetected` if the graph contains cycles.
162    /// Returns `DagError::DisconnectedNode` if a node has no connections.
163    /// Returns `DagError::NodeNotFound` if an edge references a missing node.
164    /// Returns `DagError::DuplicateNode` if nodes have duplicate names.
165    /// Returns `DagError::EmptyDag` if no nodes were added.
166    pub fn build(self) -> Result<StreamingDag, DagError> {
167        let mut dag = StreamingDag::new();
168
169        // Add nodes
170        for (name, node_type, schema) in self.nodes {
171            dag.add_node(name, node_type, schema)?;
172        }
173
174        // Add edges
175        for (from, to) in self.edges {
176            let from_id = dag
177                .node_id_by_name(&from)
178                .ok_or_else(|| DagError::NodeNotFound(from.clone()))?;
179            let to_id = dag
180                .node_id_by_name(&to)
181                .ok_or_else(|| DagError::NodeNotFound(to.clone()))?;
182            dag.add_edge(from_id, to_id)?;
183        }
184
185        // Validate and finalize
186        dag.finalize()?;
187
188        Ok(dag)
189    }
190}
191
192impl Default for DagBuilder {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198/// Builder for fan-out branches from a shared stage.
199pub struct FanOutBuilder {
200    branches: Vec<(String, DagNodeType, SchemaRef)>,
201}
202
203impl FanOutBuilder {
204    fn new() -> Self {
205        Self {
206            branches: Vec::new(),
207        }
208    }
209
210    /// Adds a stateful operator branch from the shared stage.
211    #[must_use]
212    pub fn branch(mut self, name: &str, schema: SchemaRef) -> Self {
213        self.branches
214            .push((name.to_string(), DagNodeType::StatefulOperator, schema));
215        self
216    }
217
218    /// Adds a stateless operator branch from the shared stage.
219    #[must_use]
220    pub fn stateless_branch(mut self, name: &str, schema: SchemaRef) -> Self {
221        self.branches
222            .push((name.to_string(), DagNodeType::StatelessOperator, schema));
223        self
224    }
225}