Skip to main content

laminar_core/dag/
watermark.rs

1//! DAG-native watermark tracking.
2//!
3//! Provides Vec-indexed O(1) watermark propagation through a DAG topology.
4//! Watermarks flow from source nodes downstream using min-semantics at
5//! fan-in (merge) nodes, matching the global watermark tracking pattern
6//! but scoped to DAG nodes.
7//!
8//! # Ring 0 Compatibility
9//!
10//! All operations use pre-allocated `Vec<T>` indexed by `NodeId.0` for
11//! zero-allocation lookups, consistent with `DagExecutor`.
12
13use smallvec::SmallVec;
14
15use super::topology::{NodeId, StreamingDag};
16
17/// Sentinel value indicating no watermark has been set for a node.
18const WATERMARK_UNSET: i64 = i64::MIN;
19
20/// Checkpoint state for `DagWatermarkTracker`.
21#[derive(Debug, Clone)]
22pub struct DagWatermarkCheckpoint {
23    /// Per-node watermark values (indexed by slot).
24    pub watermarks: Vec<i64>,
25}
26
27/// Vec-indexed O(1) watermark tracker for DAG pipelines.
28///
29/// Propagates watermarks through the DAG using min-semantics:
30/// a node's effective watermark is the minimum of its input sources.
31/// Uses pre-allocated vectors for Ring 0 compatibility.
32#[derive(Debug)]
33pub struct DagWatermarkTracker {
34    /// Per-node watermark values, indexed by NodeId.0.
35    watermarks: Vec<i64>,
36    /// Input dependencies per node (which nodes feed into this node).
37    source_deps: Vec<SmallVec<[NodeId; 4]>>,
38    /// Topological execution order for propagation.
39    execution_order: Vec<NodeId>,
40    /// Pre-allocated buffer for returning updated nodes.
41    updated_buffer: Vec<(NodeId, i64)>,
42    /// Number of slots allocated.
43    slot_count: usize,
44}
45
46impl DagWatermarkTracker {
47    /// Builds a watermark tracker from a finalized DAG.
48    ///
49    /// Extracts the adjacency structure and execution order from the DAG
50    /// for efficient watermark propagation.
51    #[must_use]
52    pub fn from_dag(dag: &StreamingDag) -> Self {
53        let max_id = dag
54            .nodes()
55            .keys()
56            .map(|n| n.0 as usize)
57            .max()
58            .map_or(0, |m| m + 1);
59
60        let mut source_deps = vec![SmallVec::new(); max_id];
61
62        for edge in dag.edges().values() {
63            let tgt = edge.target.0 as usize;
64            if tgt < max_id {
65                source_deps[tgt].push(edge.source);
66            }
67        }
68
69        Self {
70            watermarks: vec![WATERMARK_UNSET; max_id],
71            source_deps,
72            execution_order: dag.execution_order().to_vec(),
73            updated_buffer: Vec::with_capacity(max_id),
74            slot_count: max_id,
75        }
76    }
77
78    /// Updates a source node's watermark and propagates downstream.
79    ///
80    /// Returns a slice of `(NodeId, new_watermark)` pairs for all nodes
81    /// whose effective watermark changed as a result of this update.
82    /// The returned slice is valid until the next call to `update_watermark`.
83    pub fn update_watermark(&mut self, source: NodeId, wm: i64) -> &[(NodeId, i64)] {
84        self.updated_buffer.clear();
85
86        let idx = source.0 as usize;
87        if idx >= self.slot_count {
88            return &self.updated_buffer;
89        }
90
91        // Only update if the new watermark advances
92        if wm <= self.watermarks[idx] {
93            return &self.updated_buffer;
94        }
95
96        self.watermarks[idx] = wm;
97        self.updated_buffer.push((source, wm));
98
99        // Propagate through execution order
100        for &node in &self.execution_order {
101            let n = node.0 as usize;
102            if n >= self.slot_count || self.source_deps[n].is_empty() {
103                continue;
104            }
105
106            // Compute effective watermark = min of all input watermarks
107            let mut min_wm = i64::MAX;
108            let mut all_set = true;
109            for dep in &self.source_deps[n] {
110                let dep_wm = self.watermarks[dep.0 as usize];
111                if dep_wm == WATERMARK_UNSET {
112                    all_set = false;
113                    break;
114                }
115                min_wm = min_wm.min(dep_wm);
116            }
117
118            if !all_set {
119                continue;
120            }
121
122            if min_wm > self.watermarks[n] {
123                self.watermarks[n] = min_wm;
124                self.updated_buffer.push((node, min_wm));
125            }
126        }
127
128        &self.updated_buffer
129    }
130
131    /// Returns the current watermark for a node, or `None` if unset.
132    #[must_use]
133    pub fn get_watermark(&self, node: NodeId) -> Option<i64> {
134        let idx = node.0 as usize;
135        if idx < self.slot_count && self.watermarks[idx] != WATERMARK_UNSET {
136            Some(self.watermarks[idx])
137        } else {
138            None
139        }
140    }
141
142    /// Returns the effective watermark for a node (min of its inputs).
143    ///
144    /// For source nodes (no inputs), returns the node's own watermark.
145    /// Returns `None` if any input has not yet received a watermark.
146    #[must_use]
147    pub fn effective_watermark(&self, node: NodeId) -> Option<i64> {
148        let idx = node.0 as usize;
149        if idx >= self.slot_count {
150            return None;
151        }
152
153        if self.source_deps[idx].is_empty() {
154            // Source node: return own watermark
155            return self.get_watermark(node);
156        }
157
158        let mut min_wm = i64::MAX;
159        for dep in &self.source_deps[idx] {
160            let dep_wm = self.watermarks[dep.0 as usize];
161            if dep_wm == WATERMARK_UNSET {
162                return None;
163            }
164            min_wm = min_wm.min(dep_wm);
165        }
166        Some(min_wm)
167    }
168
169    /// Creates a checkpoint of the current watermark state.
170    #[must_use]
171    pub fn checkpoint(&self) -> DagWatermarkCheckpoint {
172        DagWatermarkCheckpoint {
173            watermarks: self.watermarks.clone(),
174        }
175    }
176
177    /// Restores watermark state from a checkpoint.
178    pub fn restore(&mut self, checkpoint: &DagWatermarkCheckpoint) {
179        let len = self.slot_count.min(checkpoint.watermarks.len());
180        self.watermarks[..len].copy_from_slice(&checkpoint.watermarks[..len]);
181    }
182
183    /// Returns the number of node slots allocated.
184    #[must_use]
185    pub fn slot_count(&self) -> usize {
186        self.slot_count
187    }
188}