Skip to main content

laminar_core/dag/
changelog.rs

1//! DAG-native changelog propagation.
2//!
3//! Manages per-node `ChangelogBuffer` instances for tracking Z-set changes
4//! through a DAG pipeline. Each node can record insertions, deletions, and
5//! retraction pairs that are drained by Ring 1 for checkpointing and sink
6//! delivery.
7//!
8//! # Ring 0 Compatibility
9//!
10//! All operations use Vec-indexed storage and delegate to `ChangelogBuffer`
11//! which is pre-allocated and zero-allocation after warmup.
12
13use std::fmt;
14
15use super::topology::{NodeId, StreamingDag};
16use crate::operator::changelog::{ChangelogBuffer, ChangelogRef};
17
18/// Per-node changelog buffer management for DAG pipelines.
19///
20/// Wraps a `ChangelogBuffer` per node, with global and per-node enable flags
21/// for zero-overhead disabled mode.
22pub struct DagChangelogPropagator {
23    /// Per-node changelog buffers, indexed by NodeId.0.
24    buffers: Vec<ChangelogBuffer>,
25    /// Per-node enable flags.
26    enabled: Vec<bool>,
27    /// Number of slots.
28    slot_count: usize,
29    /// Global enable flag (fast path to skip all recording).
30    globally_enabled: bool,
31}
32
33impl fmt::Debug for DagChangelogPropagator {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("DagChangelogPropagator")
36            .field("slot_count", &self.slot_count)
37            .field("globally_enabled", &self.globally_enabled)
38            .field("enabled", &self.enabled)
39            .finish_non_exhaustive()
40    }
41}
42
43impl DagChangelogPropagator {
44    /// Creates a propagator from a finalized DAG with the given buffer capacity.
45    ///
46    /// All nodes start enabled. Use `set_node_enabled` to selectively disable.
47    #[must_use]
48    pub fn from_dag(dag: &StreamingDag, capacity: usize) -> Self {
49        let max_id = dag
50            .nodes()
51            .keys()
52            .map(|n| n.0 as usize)
53            .max()
54            .map_or(0, |m| m + 1);
55
56        let buffers = (0..max_id)
57            .map(|_| ChangelogBuffer::with_capacity(capacity))
58            .collect();
59        let enabled = vec![true; max_id];
60
61        Self {
62            buffers,
63            enabled,
64            slot_count: max_id,
65            globally_enabled: true,
66        }
67    }
68
69    /// Creates a disabled propagator (zero overhead).
70    ///
71    /// No buffers are allocated. All `record` calls return `false` immediately.
72    #[must_use]
73    pub fn disabled(slot_count: usize) -> Self {
74        Self {
75            buffers: (0..slot_count)
76                .map(|_| ChangelogBuffer::with_capacity(0))
77                .collect(),
78            enabled: vec![false; slot_count],
79            slot_count,
80            globally_enabled: false,
81        }
82    }
83
84    /// Records a changelog reference at a node.
85    ///
86    /// Returns `true` if the reference was recorded, `false` if the propagator
87    /// is disabled, the node is disabled, or the buffer is full (backpressure).
88    #[inline]
89    pub fn record(&mut self, node: NodeId, changelog_ref: ChangelogRef) -> bool {
90        if !self.globally_enabled {
91            return false;
92        }
93        let idx = node.0 as usize;
94        if idx >= self.slot_count || !self.enabled[idx] {
95            return false;
96        }
97        self.buffers[idx].push(changelog_ref)
98    }
99
100    /// Records a retraction pair (update-before + update-after) at a node.
101    ///
102    /// Returns `true` if both references were recorded.
103    #[inline]
104    pub fn record_retraction(
105        &mut self,
106        node: NodeId,
107        batch_offset: u32,
108        old_row_index: u32,
109        new_row_index: u32,
110    ) -> bool {
111        if !self.globally_enabled {
112            return false;
113        }
114        let idx = node.0 as usize;
115        if idx >= self.slot_count || !self.enabled[idx] {
116            return false;
117        }
118        self.buffers[idx].push_retraction(batch_offset, old_row_index, new_row_index)
119    }
120
121    /// Drains all changelog references from a node's buffer.
122    ///
123    /// Returns the drained references. The buffer is cleared but retains capacity.
124    pub fn drain_node(&mut self, node: NodeId) -> Vec<ChangelogRef> {
125        let idx = node.0 as usize;
126        if idx >= self.slot_count {
127            return Vec::new();
128        }
129        self.buffers[idx].drain().collect()
130    }
131
132    /// Drains all changelog references from all node buffers.
133    pub fn drain_all(&mut self) {
134        for buffer in &mut self.buffers {
135            buffer.clear();
136        }
137    }
138
139    /// Returns `true` if any node has pending changelog references.
140    #[must_use]
141    pub fn has_pending(&self) -> bool {
142        if !self.globally_enabled {
143            return false;
144        }
145        self.buffers.iter().any(|b| !b.is_empty())
146    }
147
148    /// Returns the number of pending changelog references at a node.
149    #[must_use]
150    pub fn pending_count(&self, node: NodeId) -> usize {
151        let idx = node.0 as usize;
152        if idx >= self.slot_count {
153            return 0;
154        }
155        self.buffers[idx].len()
156    }
157
158    /// Enables or disables changelog recording for a specific node.
159    pub fn set_node_enabled(&mut self, node: NodeId, enabled: bool) {
160        let idx = node.0 as usize;
161        if idx < self.slot_count {
162            self.enabled[idx] = enabled;
163        }
164    }
165
166    /// Enables or disables changelog recording globally.
167    pub fn set_globally_enabled(&mut self, enabled: bool) {
168        self.globally_enabled = enabled;
169    }
170
171    /// Returns whether changelog recording is globally enabled.
172    #[must_use]
173    pub fn is_globally_enabled(&self) -> bool {
174        self.globally_enabled
175    }
176
177    /// Returns the number of node slots.
178    #[must_use]
179    pub fn slot_count(&self) -> usize {
180        self.slot_count
181    }
182}