laminar_core/dag/multicast.rs
1//! Zero-copy multicast buffer for shared intermediate stages.
2//!
3//! [`MulticastBuffer<T>`] implements a pre-allocated ring buffer with per-slot
4//! reference counting for single-producer, multiple-consumer (SPMC) multicast.
5//! Designed for Ring 0 hot path: zero allocations after construction.
6//!
7//! # Design
8//!
9//! - Pre-allocated slots with power-of-2 capacity and bitmask indexing
10//! - Single writer via [`publish()`](MulticastBuffer::publish), multiple
11//! readers via [`consume()`](MulticastBuffer::consume)
12//! - Per-slot atomic refcount tracks outstanding consumers
13//! - Backpressure: `publish()` fails when slowest consumer hasn't freed a slot
14//!
15//! # Safety
16//!
17//! The single-writer invariant is upheld by the DAG executor,
18//! which ensures exactly one thread calls `publish()` on any given buffer.
19//! Multiple threads may call `consume()` with distinct `consumer_idx` values.
20
21use std::cell::UnsafeCell;
22use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
23
24use super::error::DagError;
25
26/// Pre-allocated SPMC multicast buffer with reference-counted slots.
27///
28/// Provides zero-allocation publish/consume on the hot path. The buffer
29/// is constructed in Ring 2 and used in Ring 0.
30///
31/// # Type Parameters
32///
33/// * `T` - The event type. Must be `Clone` for consumers (typically
34/// `Arc<RecordBatch>` where clone is an O(1) atomic increment).
35///
36/// # Performance Targets
37///
38/// | Operation | Target |
39/// |-----------|--------|
40/// | `publish()` | < 100ns |
41/// | `consume()` | < 50ns |
42pub struct MulticastBuffer<T> {
43 /// Pre-allocated ring buffer slots.
44 slots: Box<[UnsafeCell<Option<T>>]>,
45 /// Per-slot reference counts. 0 = free, N = N consumers still need to read.
46 refcounts: Box<[AtomicU32]>,
47 /// Monotonically increasing write position (single writer).
48 write_pos: AtomicU64,
49 /// Per-consumer read positions.
50 read_positions: Box<[AtomicU64]>,
51 /// Buffer capacity (power of 2).
52 capacity: usize,
53 /// Bitmask for modular indexing (`capacity - 1`).
54 mask: usize,
55 /// Number of consumers.
56 consumer_count: u32,
57}
58
59// SAFETY: MulticastBuffer is designed for SPMC (single-producer, multi-consumer):
60// - Single writer thread calls publish() (enforced by DAG executor)
61// - Multiple consumer threads call consume() with distinct indices
62// - All shared state uses atomic operations with appropriate memory ordering
63// - UnsafeCell access is guarded by refcount/write_pos synchronization
64unsafe impl<T: Send> Send for MulticastBuffer<T> {}
65// SAFETY: See above. Consumers access distinct read_positions entries.
66// Slot reads are protected by the write_pos/refcount protocol.
67unsafe impl<T: Send> Sync for MulticastBuffer<T> {}
68
69impl<T> MulticastBuffer<T> {
70 /// Creates a new multicast buffer.
71 ///
72 /// Allocates all slots up front (Ring 2). The hot path
73 /// (`publish`/`consume`) is allocation-free.
74 ///
75 /// # Arguments
76 ///
77 /// * `capacity` - Number of slots (must be a power of 2, > 0)
78 /// * `consumer_count` - Number of downstream consumers
79 ///
80 /// # Panics
81 ///
82 /// Panics if `capacity` is not a power of 2 or is 0.
83 #[must_use]
84 pub fn new(capacity: usize, consumer_count: usize) -> Self {
85 assert!(
86 capacity > 0 && capacity.is_power_of_two(),
87 "capacity must be a non-zero power of 2, got {capacity}"
88 );
89
90 let slots: Vec<UnsafeCell<Option<T>>> =
91 (0..capacity).map(|_| UnsafeCell::new(None)).collect();
92 let refcounts: Vec<AtomicU32> = (0..capacity).map(|_| AtomicU32::new(0)).collect();
93 let read_positions: Vec<AtomicU64> =
94 (0..consumer_count).map(|_| AtomicU64::new(0)).collect();
95
96 Self {
97 slots: slots.into_boxed_slice(),
98 refcounts: refcounts.into_boxed_slice(),
99 write_pos: AtomicU64::new(0),
100 read_positions: read_positions.into_boxed_slice(),
101 capacity,
102 mask: capacity - 1,
103 #[allow(clippy::cast_possible_truncation)] // Consumer count bounded by MAX_FAN_OUT (8)
104 consumer_count: consumer_count as u32,
105 }
106 }
107
108 /// Publishes a value to all consumers.
109 ///
110 /// Writes the value into the next available slot and sets the refcount
111 /// to `consumer_count`. All consumers will be able to read this value
112 /// via [`consume()`](Self::consume).
113 ///
114 /// # Errors
115 ///
116 /// Returns [`DagError::BackpressureFull`] if the target slot is still
117 /// in use by a slow consumer (backpressure).
118 ///
119 /// # Safety Contract
120 ///
121 /// Must be called from a single writer thread only. The DAG executor
122 /// enforces this by assigning exactly one producer per shared stage.
123 pub fn publish(&self, value: T) -> Result<(), DagError> {
124 let pos = self.write_pos.load(Ordering::Relaxed);
125 // Bitmask truncates to capacity range, so u64->usize narrowing is safe.
126 #[allow(clippy::cast_possible_truncation)]
127 let slot_idx = (pos as usize) & self.mask;
128
129 // Check if slot is free (all consumers have finished reading).
130 if self.refcounts[slot_idx].load(Ordering::Acquire) != 0 {
131 return Err(DagError::BackpressureFull);
132 }
133
134 // SAFETY: Single writer guarantees exclusive write access to this slot.
135 // The Acquire load of refcount above ensures all consumers have completed
136 // their reads (refcount == 0 means no outstanding readers).
137 unsafe { *self.slots[slot_idx].get() = Some(value) };
138
139 // Set refcount before advancing write_pos. Release ordering ensures
140 // the slot value is visible before consumers can observe the new write_pos.
141 self.refcounts[slot_idx].store(self.consumer_count, Ordering::Release);
142
143 // Advance write position (Release makes new data visible to consumers).
144 self.write_pos.store(pos + 1, Ordering::Release);
145
146 Ok(())
147 }
148
149 /// Consumes the next value for a given consumer.
150 ///
151 /// Returns `None` if no new data is available. Each consumer maintains
152 /// its own read position and will receive every published value in order.
153 ///
154 /// # Arguments
155 ///
156 /// * `consumer_idx` - The consumer's index (0-based, must be < `consumer_count`)
157 ///
158 /// # Panics
159 ///
160 /// Panics in debug mode if `consumer_idx >= consumer_count`.
161 pub fn consume(&self, consumer_idx: usize) -> Option<T>
162 where
163 T: Clone,
164 {
165 debug_assert!(
166 consumer_idx < self.consumer_count as usize,
167 "consumer_idx {consumer_idx} >= consumer_count {}",
168 self.consumer_count
169 );
170
171 let read_pos = self.read_positions[consumer_idx].load(Ordering::Relaxed);
172 let write_pos = self.write_pos.load(Ordering::Acquire);
173
174 if read_pos >= write_pos {
175 return None; // No data available
176 }
177
178 // Bitmask truncates to capacity range, so u64->usize narrowing is safe.
179 #[allow(clippy::cast_possible_truncation)]
180 let slot_idx = (read_pos as usize) & self.mask;
181
182 // SAFETY: write_pos > read_pos guarantees this slot contains valid data.
183 // The Acquire load of write_pos above synchronizes-with the Release store
184 // in publish(), ensuring the slot value is visible.
185 let value = unsafe { (*self.slots[slot_idx].get()).as_ref().unwrap().clone() };
186
187 // Advance read position (prevents re-reading this slot).
188 self.read_positions[consumer_idx].store(read_pos + 1, Ordering::Release);
189
190 // Decrement refcount. AcqRel: the release side ensures our slot read
191 // happens-before the publisher can reuse the slot after seeing refcount == 0.
192 self.refcounts[slot_idx].fetch_sub(1, Ordering::AcqRel);
193
194 Some(value)
195 }
196
197 /// Returns the buffer capacity (number of slots).
198 #[inline]
199 #[must_use]
200 pub fn capacity(&self) -> usize {
201 self.capacity
202 }
203
204 /// Returns the number of consumers.
205 #[inline]
206 #[must_use]
207 pub fn consumer_count(&self) -> u32 {
208 self.consumer_count
209 }
210
211 /// Returns the current write position (total number of publishes).
212 #[inline]
213 #[must_use]
214 pub fn write_position(&self) -> u64 {
215 self.write_pos.load(Ordering::Relaxed)
216 }
217
218 /// Returns the current read position for a consumer.
219 ///
220 /// # Panics
221 ///
222 /// Panics if `consumer_idx` is out of bounds.
223 #[inline]
224 #[must_use]
225 pub fn read_position(&self, consumer_idx: usize) -> u64 {
226 self.read_positions[consumer_idx].load(Ordering::Relaxed)
227 }
228}
229
230impl<T> std::fmt::Debug for MulticastBuffer<T> {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 f.debug_struct("MulticastBuffer")
233 .field("capacity", &self.capacity)
234 .field("consumer_count", &self.consumer_count)
235 .field("write_pos", &self.write_pos.load(Ordering::Relaxed))
236 .finish_non_exhaustive()
237 }
238}