laminar_core/dag/
builder.rs1use arrow_schema::SchemaRef;
7use rustc_hash::FxHashMap;
8
9use super::error::DagError;
10use super::topology::{DagNodeType, StreamingDag};
11
12pub struct DagBuilder {
39 nodes: Vec<(String, DagNodeType, SchemaRef)>,
41 edges: Vec<(String, String)>,
43 name_index: FxHashMap<String, usize>,
45}
46
47impl DagBuilder {
48 #[must_use]
50 pub fn new() -> Self {
51 Self {
52 nodes: Vec::new(),
53 edges: Vec::new(),
54 name_index: FxHashMap::default(),
55 }
56 }
57
58 #[must_use]
63 pub fn source(mut self, name: &str, schema: SchemaRef) -> Self {
64 let idx = self.nodes.len();
65 self.nodes
66 .push((name.to_string(), DagNodeType::Source, schema));
67 self.name_index.insert(name.to_string(), idx);
68 self
69 }
70
71 #[must_use]
73 pub fn operator(mut self, name: &str, schema: SchemaRef) -> Self {
74 let idx = self.nodes.len();
75 self.nodes
76 .push((name.to_string(), DagNodeType::StatefulOperator, schema));
77 self.name_index.insert(name.to_string(), idx);
78 self
79 }
80
81 #[must_use]
83 pub fn stateless_operator(mut self, name: &str, schema: SchemaRef) -> Self {
84 let idx = self.nodes.len();
85 self.nodes
86 .push((name.to_string(), DagNodeType::StatelessOperator, schema));
87 self.name_index.insert(name.to_string(), idx);
88 self
89 }
90
91 #[must_use]
93 pub fn materialized_view(mut self, name: &str, schema: SchemaRef) -> Self {
94 let idx = self.nodes.len();
95 self.nodes
96 .push((name.to_string(), DagNodeType::MaterializedView, schema));
97 self.name_index.insert(name.to_string(), idx);
98 self
99 }
100
101 #[must_use]
103 pub fn connect(mut self, from: &str, to: &str) -> Self {
104 self.edges.push((from.to_string(), to.to_string()));
105 self
106 }
107
108 #[must_use]
114 pub fn fan_out<F>(mut self, shared_node: &str, branches: F) -> Self
115 where
116 F: FnOnce(FanOutBuilder) -> FanOutBuilder,
117 {
118 let fan_out = branches(FanOutBuilder::new());
119 for (branch_name, branch_type, branch_schema) in fan_out.branches {
120 let idx = self.nodes.len();
121 self.nodes
122 .push((branch_name.clone(), branch_type, branch_schema));
123 self.name_index.insert(branch_name.clone(), idx);
124 self.edges.push((shared_node.to_string(), branch_name));
125 }
126 self
127 }
128
129 #[must_use]
131 pub fn sink_for(mut self, upstream: &str, sink_name: &str, schema: SchemaRef) -> Self {
132 let idx = self.nodes.len();
133 self.nodes
134 .push((sink_name.to_string(), DagNodeType::Sink, schema));
135 self.name_index.insert(sink_name.to_string(), idx);
136 self.edges
137 .push((upstream.to_string(), sink_name.to_string()));
138 self
139 }
140
141 #[must_use]
145 pub fn sink(mut self, name: &str, schema: SchemaRef) -> Self {
146 let idx = self.nodes.len();
147 self.nodes
148 .push((name.to_string(), DagNodeType::Sink, schema));
149 self.name_index.insert(name.to_string(), idx);
150 self
151 }
152
153 pub fn build(self) -> Result<StreamingDag, DagError> {
167 let mut dag = StreamingDag::new();
168
169 for (name, node_type, schema) in self.nodes {
171 dag.add_node(name, node_type, schema)?;
172 }
173
174 for (from, to) in self.edges {
176 let from_id = dag
177 .node_id_by_name(&from)
178 .ok_or_else(|| DagError::NodeNotFound(from.clone()))?;
179 let to_id = dag
180 .node_id_by_name(&to)
181 .ok_or_else(|| DagError::NodeNotFound(to.clone()))?;
182 dag.add_edge(from_id, to_id)?;
183 }
184
185 dag.finalize()?;
187
188 Ok(dag)
189 }
190}
191
192impl Default for DagBuilder {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198pub struct FanOutBuilder {
200 branches: Vec<(String, DagNodeType, SchemaRef)>,
201}
202
203impl FanOutBuilder {
204 fn new() -> Self {
205 Self {
206 branches: Vec::new(),
207 }
208 }
209
210 #[must_use]
212 pub fn branch(mut self, name: &str, schema: SchemaRef) -> Self {
213 self.branches
214 .push((name.to_string(), DagNodeType::StatefulOperator, schema));
215 self
216 }
217
218 #[must_use]
220 pub fn stateless_branch(mut self, name: &str, schema: SchemaRef) -> Self {
221 self.branches
222 .push((name.to_string(), DagNodeType::StatelessOperator, schema));
223 self
224 }
225}