Skip to main content

laminar_core/dag/
multicast.rs

1//! Zero-copy multicast buffer for shared intermediate stages.
2//!
3//! [`MulticastBuffer<T>`] implements a pre-allocated ring buffer with per-slot
4//! reference counting for single-producer, multiple-consumer (SPMC) multicast.
5//! Designed for Ring 0 hot path: zero allocations after construction.
6//!
7//! # Design
8//!
9//! - Pre-allocated slots with power-of-2 capacity and bitmask indexing
10//! - Single writer via [`publish()`](MulticastBuffer::publish), multiple
11//!   readers via [`consume()`](MulticastBuffer::consume)
12//! - Per-slot atomic refcount tracks outstanding consumers
13//! - Backpressure: `publish()` fails when slowest consumer hasn't freed a slot
14//!
15//! # Safety
16//!
17//! The single-writer invariant is upheld by the DAG executor,
18//! which ensures exactly one thread calls `publish()` on any given buffer.
19//! Multiple threads may call `consume()` with distinct `consumer_idx` values.
20
21use std::cell::UnsafeCell;
22use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
23
24use super::error::DagError;
25
26/// Pre-allocated SPMC multicast buffer with reference-counted slots.
27///
28/// Provides zero-allocation publish/consume on the hot path. The buffer
29/// is constructed in Ring 2 and used in Ring 0.
30///
31/// # Type Parameters
32///
33/// * `T` - The event type. Must be `Clone` for consumers (typically
34///   `Arc<RecordBatch>` where clone is an O(1) atomic increment).
35///
36/// # Performance Targets
37///
38/// | Operation | Target |
39/// |-----------|--------|
40/// | `publish()` | < 100ns |
41/// | `consume()` | < 50ns |
42pub struct MulticastBuffer<T> {
43    /// Pre-allocated ring buffer slots.
44    slots: Box<[UnsafeCell<Option<T>>]>,
45    /// Per-slot reference counts. 0 = free, N = N consumers still need to read.
46    refcounts: Box<[AtomicU32]>,
47    /// Monotonically increasing write position (single writer).
48    write_pos: AtomicU64,
49    /// Per-consumer read positions.
50    read_positions: Box<[AtomicU64]>,
51    /// Buffer capacity (power of 2).
52    capacity: usize,
53    /// Bitmask for modular indexing (`capacity - 1`).
54    mask: usize,
55    /// Number of consumers.
56    consumer_count: u32,
57}
58
59// SAFETY: MulticastBuffer is designed for SPMC (single-producer, multi-consumer):
60// - Single writer thread calls publish() (enforced by DAG executor)
61// - Multiple consumer threads call consume() with distinct indices
62// - All shared state uses atomic operations with appropriate memory ordering
63// - UnsafeCell access is guarded by refcount/write_pos synchronization
64unsafe impl<T: Send> Send for MulticastBuffer<T> {}
65// SAFETY: See above. Consumers access distinct read_positions entries.
66// Slot reads are protected by the write_pos/refcount protocol.
67unsafe impl<T: Send> Sync for MulticastBuffer<T> {}
68
69impl<T> MulticastBuffer<T> {
70    /// Creates a new multicast buffer.
71    ///
72    /// Allocates all slots up front (Ring 2). The hot path
73    /// (`publish`/`consume`) is allocation-free.
74    ///
75    /// # Arguments
76    ///
77    /// * `capacity` - Number of slots (must be a power of 2, > 0)
78    /// * `consumer_count` - Number of downstream consumers
79    ///
80    /// # Panics
81    ///
82    /// Panics if `capacity` is not a power of 2 or is 0.
83    #[must_use]
84    pub fn new(capacity: usize, consumer_count: usize) -> Self {
85        assert!(
86            capacity > 0 && capacity.is_power_of_two(),
87            "capacity must be a non-zero power of 2, got {capacity}"
88        );
89
90        let slots: Vec<UnsafeCell<Option<T>>> =
91            (0..capacity).map(|_| UnsafeCell::new(None)).collect();
92        let refcounts: Vec<AtomicU32> = (0..capacity).map(|_| AtomicU32::new(0)).collect();
93        let read_positions: Vec<AtomicU64> =
94            (0..consumer_count).map(|_| AtomicU64::new(0)).collect();
95
96        Self {
97            slots: slots.into_boxed_slice(),
98            refcounts: refcounts.into_boxed_slice(),
99            write_pos: AtomicU64::new(0),
100            read_positions: read_positions.into_boxed_slice(),
101            capacity,
102            mask: capacity - 1,
103            #[allow(clippy::cast_possible_truncation)] // Consumer count bounded by MAX_FAN_OUT (8)
104            consumer_count: consumer_count as u32,
105        }
106    }
107
108    /// Publishes a value to all consumers.
109    ///
110    /// Writes the value into the next available slot and sets the refcount
111    /// to `consumer_count`. All consumers will be able to read this value
112    /// via [`consume()`](Self::consume).
113    ///
114    /// # Errors
115    ///
116    /// Returns [`DagError::BackpressureFull`] if the target slot is still
117    /// in use by a slow consumer (backpressure).
118    ///
119    /// # Safety Contract
120    ///
121    /// Must be called from a single writer thread only. The DAG executor
122    /// enforces this by assigning exactly one producer per shared stage.
123    pub fn publish(&self, value: T) -> Result<(), DagError> {
124        let pos = self.write_pos.load(Ordering::Relaxed);
125        // Bitmask truncates to capacity range, so u64->usize narrowing is safe.
126        #[allow(clippy::cast_possible_truncation)]
127        let slot_idx = (pos as usize) & self.mask;
128
129        // Check if slot is free (all consumers have finished reading).
130        if self.refcounts[slot_idx].load(Ordering::Acquire) != 0 {
131            return Err(DagError::BackpressureFull);
132        }
133
134        // SAFETY: Single writer guarantees exclusive write access to this slot.
135        // The Acquire load of refcount above ensures all consumers have completed
136        // their reads (refcount == 0 means no outstanding readers).
137        unsafe { *self.slots[slot_idx].get() = Some(value) };
138
139        // Set refcount before advancing write_pos. Release ordering ensures
140        // the slot value is visible before consumers can observe the new write_pos.
141        self.refcounts[slot_idx].store(self.consumer_count, Ordering::Release);
142
143        // Advance write position (Release makes new data visible to consumers).
144        self.write_pos.store(pos + 1, Ordering::Release);
145
146        Ok(())
147    }
148
149    /// Consumes the next value for a given consumer.
150    ///
151    /// Returns `None` if no new data is available. Each consumer maintains
152    /// its own read position and will receive every published value in order.
153    ///
154    /// # Arguments
155    ///
156    /// * `consumer_idx` - The consumer's index (0-based, must be < `consumer_count`)
157    ///
158    /// # Panics
159    ///
160    /// Panics in debug mode if `consumer_idx >= consumer_count`.
161    pub fn consume(&self, consumer_idx: usize) -> Option<T>
162    where
163        T: Clone,
164    {
165        debug_assert!(
166            consumer_idx < self.consumer_count as usize,
167            "consumer_idx {consumer_idx} >= consumer_count {}",
168            self.consumer_count
169        );
170
171        let read_pos = self.read_positions[consumer_idx].load(Ordering::Relaxed);
172        let write_pos = self.write_pos.load(Ordering::Acquire);
173
174        if read_pos >= write_pos {
175            return None; // No data available
176        }
177
178        // Bitmask truncates to capacity range, so u64->usize narrowing is safe.
179        #[allow(clippy::cast_possible_truncation)]
180        let slot_idx = (read_pos as usize) & self.mask;
181
182        // SAFETY: write_pos > read_pos guarantees this slot contains valid data.
183        // The Acquire load of write_pos above synchronizes-with the Release store
184        // in publish(), ensuring the slot value is visible.
185        let value = unsafe { (*self.slots[slot_idx].get()).as_ref().unwrap().clone() };
186
187        // Advance read position (prevents re-reading this slot).
188        self.read_positions[consumer_idx].store(read_pos + 1, Ordering::Release);
189
190        // Decrement refcount. AcqRel: the release side ensures our slot read
191        // happens-before the publisher can reuse the slot after seeing refcount == 0.
192        self.refcounts[slot_idx].fetch_sub(1, Ordering::AcqRel);
193
194        Some(value)
195    }
196
197    /// Returns the buffer capacity (number of slots).
198    #[inline]
199    #[must_use]
200    pub fn capacity(&self) -> usize {
201        self.capacity
202    }
203
204    /// Returns the number of consumers.
205    #[inline]
206    #[must_use]
207    pub fn consumer_count(&self) -> u32 {
208        self.consumer_count
209    }
210
211    /// Returns the current write position (total number of publishes).
212    #[inline]
213    #[must_use]
214    pub fn write_position(&self) -> u64 {
215        self.write_pos.load(Ordering::Relaxed)
216    }
217
218    /// Returns the current read position for a consumer.
219    ///
220    /// # Panics
221    ///
222    /// Panics if `consumer_idx` is out of bounds.
223    #[inline]
224    #[must_use]
225    pub fn read_position(&self, consumer_idx: usize) -> u64 {
226        self.read_positions[consumer_idx].load(Ordering::Relaxed)
227    }
228}
229
230impl<T> std::fmt::Debug for MulticastBuffer<T> {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        f.debug_struct("MulticastBuffer")
233            .field("capacity", &self.capacity)
234            .field("consumer_count", &self.consumer_count)
235            .field("write_pos", &self.write_pos.load(Ordering::Relaxed))
236            .finish_non_exhaustive()
237    }
238}