laminar_core/dag/watermark.rs
1//! DAG-native watermark tracking.
2//!
3//! Provides Vec-indexed O(1) watermark propagation through a DAG topology.
4//! Watermarks flow from source nodes downstream using min-semantics at
5//! fan-in (merge) nodes, matching the global watermark tracking pattern
6//! but scoped to DAG nodes.
7//!
8//! # Ring 0 Compatibility
9//!
10//! All operations use pre-allocated `Vec<T>` indexed by `NodeId.0` for
11//! zero-allocation lookups, consistent with `DagExecutor`.
12
13use smallvec::SmallVec;
14
15use super::topology::{NodeId, StreamingDag};
16
17/// Sentinel value indicating no watermark has been set for a node.
18const WATERMARK_UNSET: i64 = i64::MIN;
19
20/// Checkpoint state for `DagWatermarkTracker`.
21#[derive(Debug, Clone)]
22pub struct DagWatermarkCheckpoint {
23 /// Per-node watermark values (indexed by slot).
24 pub watermarks: Vec<i64>,
25}
26
27/// Vec-indexed O(1) watermark tracker for DAG pipelines.
28///
29/// Propagates watermarks through the DAG using min-semantics:
30/// a node's effective watermark is the minimum of its input sources.
31/// Uses pre-allocated vectors for Ring 0 compatibility.
32#[derive(Debug)]
33pub struct DagWatermarkTracker {
34 /// Per-node watermark values, indexed by NodeId.0.
35 watermarks: Vec<i64>,
36 /// Input dependencies per node (which nodes feed into this node).
37 source_deps: Vec<SmallVec<[NodeId; 4]>>,
38 /// Topological execution order for propagation.
39 execution_order: Vec<NodeId>,
40 /// Pre-allocated buffer for returning updated nodes.
41 updated_buffer: Vec<(NodeId, i64)>,
42 /// Number of slots allocated.
43 slot_count: usize,
44}
45
46impl DagWatermarkTracker {
47 /// Builds a watermark tracker from a finalized DAG.
48 ///
49 /// Extracts the adjacency structure and execution order from the DAG
50 /// for efficient watermark propagation.
51 #[must_use]
52 pub fn from_dag(dag: &StreamingDag) -> Self {
53 let max_id = dag
54 .nodes()
55 .keys()
56 .map(|n| n.0 as usize)
57 .max()
58 .map_or(0, |m| m + 1);
59
60 let mut source_deps = vec![SmallVec::new(); max_id];
61
62 for edge in dag.edges().values() {
63 let tgt = edge.target.0 as usize;
64 if tgt < max_id {
65 source_deps[tgt].push(edge.source);
66 }
67 }
68
69 Self {
70 watermarks: vec![WATERMARK_UNSET; max_id],
71 source_deps,
72 execution_order: dag.execution_order().to_vec(),
73 updated_buffer: Vec::with_capacity(max_id),
74 slot_count: max_id,
75 }
76 }
77
78 /// Updates a source node's watermark and propagates downstream.
79 ///
80 /// Returns a slice of `(NodeId, new_watermark)` pairs for all nodes
81 /// whose effective watermark changed as a result of this update.
82 /// The returned slice is valid until the next call to `update_watermark`.
83 pub fn update_watermark(&mut self, source: NodeId, wm: i64) -> &[(NodeId, i64)] {
84 self.updated_buffer.clear();
85
86 let idx = source.0 as usize;
87 if idx >= self.slot_count {
88 return &self.updated_buffer;
89 }
90
91 // Only update if the new watermark advances
92 if wm <= self.watermarks[idx] {
93 return &self.updated_buffer;
94 }
95
96 self.watermarks[idx] = wm;
97 self.updated_buffer.push((source, wm));
98
99 // Propagate through execution order
100 for &node in &self.execution_order {
101 let n = node.0 as usize;
102 if n >= self.slot_count || self.source_deps[n].is_empty() {
103 continue;
104 }
105
106 // Compute effective watermark = min of all input watermarks
107 let mut min_wm = i64::MAX;
108 let mut all_set = true;
109 for dep in &self.source_deps[n] {
110 let dep_wm = self.watermarks[dep.0 as usize];
111 if dep_wm == WATERMARK_UNSET {
112 all_set = false;
113 break;
114 }
115 min_wm = min_wm.min(dep_wm);
116 }
117
118 if !all_set {
119 continue;
120 }
121
122 if min_wm > self.watermarks[n] {
123 self.watermarks[n] = min_wm;
124 self.updated_buffer.push((node, min_wm));
125 }
126 }
127
128 &self.updated_buffer
129 }
130
131 /// Returns the current watermark for a node, or `None` if unset.
132 #[must_use]
133 pub fn get_watermark(&self, node: NodeId) -> Option<i64> {
134 let idx = node.0 as usize;
135 if idx < self.slot_count && self.watermarks[idx] != WATERMARK_UNSET {
136 Some(self.watermarks[idx])
137 } else {
138 None
139 }
140 }
141
142 /// Returns the effective watermark for a node (min of its inputs).
143 ///
144 /// For source nodes (no inputs), returns the node's own watermark.
145 /// Returns `None` if any input has not yet received a watermark.
146 #[must_use]
147 pub fn effective_watermark(&self, node: NodeId) -> Option<i64> {
148 let idx = node.0 as usize;
149 if idx >= self.slot_count {
150 return None;
151 }
152
153 if self.source_deps[idx].is_empty() {
154 // Source node: return own watermark
155 return self.get_watermark(node);
156 }
157
158 let mut min_wm = i64::MAX;
159 for dep in &self.source_deps[idx] {
160 let dep_wm = self.watermarks[dep.0 as usize];
161 if dep_wm == WATERMARK_UNSET {
162 return None;
163 }
164 min_wm = min_wm.min(dep_wm);
165 }
166 Some(min_wm)
167 }
168
169 /// Creates a checkpoint of the current watermark state.
170 #[must_use]
171 pub fn checkpoint(&self) -> DagWatermarkCheckpoint {
172 DagWatermarkCheckpoint {
173 watermarks: self.watermarks.clone(),
174 }
175 }
176
177 /// Restores watermark state from a checkpoint.
178 pub fn restore(&mut self, checkpoint: &DagWatermarkCheckpoint) {
179 let len = self.slot_count.min(checkpoint.watermarks.len());
180 self.watermarks[..len].copy_from_slice(&checkpoint.watermarks[..len]);
181 }
182
183 /// Returns the number of node slots allocated.
184 #[must_use]
185 pub fn slot_count(&self) -> usize {
186 self.slot_count
187 }
188}