laminar_core/dag/
changelog.rs1use std::fmt;
14
15use super::topology::{NodeId, StreamingDag};
16use crate::operator::changelog::{ChangelogBuffer, ChangelogRef};
17
18pub struct DagChangelogPropagator {
23 buffers: Vec<ChangelogBuffer>,
25 enabled: Vec<bool>,
27 slot_count: usize,
29 globally_enabled: bool,
31}
32
33impl fmt::Debug for DagChangelogPropagator {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("DagChangelogPropagator")
36 .field("slot_count", &self.slot_count)
37 .field("globally_enabled", &self.globally_enabled)
38 .field("enabled", &self.enabled)
39 .finish_non_exhaustive()
40 }
41}
42
43impl DagChangelogPropagator {
44 #[must_use]
48 pub fn from_dag(dag: &StreamingDag, capacity: usize) -> Self {
49 let max_id = dag
50 .nodes()
51 .keys()
52 .map(|n| n.0 as usize)
53 .max()
54 .map_or(0, |m| m + 1);
55
56 let buffers = (0..max_id)
57 .map(|_| ChangelogBuffer::with_capacity(capacity))
58 .collect();
59 let enabled = vec![true; max_id];
60
61 Self {
62 buffers,
63 enabled,
64 slot_count: max_id,
65 globally_enabled: true,
66 }
67 }
68
69 #[must_use]
73 pub fn disabled(slot_count: usize) -> Self {
74 Self {
75 buffers: (0..slot_count)
76 .map(|_| ChangelogBuffer::with_capacity(0))
77 .collect(),
78 enabled: vec![false; slot_count],
79 slot_count,
80 globally_enabled: false,
81 }
82 }
83
84 #[inline]
89 pub fn record(&mut self, node: NodeId, changelog_ref: ChangelogRef) -> bool {
90 if !self.globally_enabled {
91 return false;
92 }
93 let idx = node.0 as usize;
94 if idx >= self.slot_count || !self.enabled[idx] {
95 return false;
96 }
97 self.buffers[idx].push(changelog_ref)
98 }
99
100 #[inline]
104 pub fn record_retraction(
105 &mut self,
106 node: NodeId,
107 batch_offset: u32,
108 old_row_index: u32,
109 new_row_index: u32,
110 ) -> bool {
111 if !self.globally_enabled {
112 return false;
113 }
114 let idx = node.0 as usize;
115 if idx >= self.slot_count || !self.enabled[idx] {
116 return false;
117 }
118 self.buffers[idx].push_retraction(batch_offset, old_row_index, new_row_index)
119 }
120
121 pub fn drain_node(&mut self, node: NodeId) -> Vec<ChangelogRef> {
125 let idx = node.0 as usize;
126 if idx >= self.slot_count {
127 return Vec::new();
128 }
129 self.buffers[idx].drain().collect()
130 }
131
132 pub fn drain_all(&mut self) {
134 for buffer in &mut self.buffers {
135 buffer.clear();
136 }
137 }
138
139 #[must_use]
141 pub fn has_pending(&self) -> bool {
142 if !self.globally_enabled {
143 return false;
144 }
145 self.buffers.iter().any(|b| !b.is_empty())
146 }
147
148 #[must_use]
150 pub fn pending_count(&self, node: NodeId) -> usize {
151 let idx = node.0 as usize;
152 if idx >= self.slot_count {
153 return 0;
154 }
155 self.buffers[idx].len()
156 }
157
158 pub fn set_node_enabled(&mut self, node: NodeId, enabled: bool) {
160 let idx = node.0 as usize;
161 if idx < self.slot_count {
162 self.enabled[idx] = enabled;
163 }
164 }
165
166 pub fn set_globally_enabled(&mut self, enabled: bool) {
168 self.globally_enabled = enabled;
169 }
170
171 #[must_use]
173 pub fn is_globally_enabled(&self) -> bool {
174 self.globally_enabled
175 }
176
177 #[must_use]
179 pub fn slot_count(&self) -> usize {
180 self.slot_count
181 }
182}