Skip to main content

laminar_core/tpc/
spsc.rs

1//! # SPSC Queue
2//!
3//! Lock-free single-producer single-consumer bounded queue optimized for
4//! inter-core communication in thread-per-core architectures.
5//!
6//! ## Design
7//!
8//! - Cache-line padded head/tail indices prevent false sharing
9//! - Power-of-2 capacity for fast modulo via bitmask
10//! - Acquire/Release memory ordering for lock-free operation
11//! - Batch operations for throughput optimization
12//!
13//! ## Performance
14//!
15//! Target: < 50ns per push/pop operation.
16
17use std::cell::UnsafeCell;
18use std::mem::MaybeUninit;
19use std::sync::atomic::{AtomicUsize, Ordering};
20
21/// A wrapper that pads a value to a 128-byte boundary to prevent false sharing.
22///
23/// False sharing occurs when two threads access different data that happens to
24/// reside on the same cache line, causing unnecessary cache invalidations.
25/// We use 128-byte alignment (two cache lines) because Intel's L2 spatial
26/// prefetcher on Skylake+ fetches adjacent 64-byte line pairs, which can cause
27/// false sharing effects that 64-byte padding alone does not prevent.
28///
29/// # Example
30///
31/// ```rust
32/// use laminar_core::tpc::CachePadded;
33/// use std::sync::atomic::AtomicUsize;
34///
35/// // Each counter gets its own cache line pair
36/// let counter1 = CachePadded::new(AtomicUsize::new(0));
37/// let counter2 = CachePadded::new(AtomicUsize::new(0));
38///
39/// // Access the inner value
40/// assert_eq!(counter1.load(std::sync::atomic::Ordering::Relaxed), 0);
41/// ```
42#[repr(C, align(128))]
43pub struct CachePadded<T> {
44    value: T,
45}
46
47// SAFETY: CachePadded is Send if T is Send
48#[allow(unsafe_code)]
49unsafe impl<T: Send> Send for CachePadded<T> {}
50
51// SAFETY: CachePadded is Sync if T is Sync
52#[allow(unsafe_code)]
53unsafe impl<T: Sync> Sync for CachePadded<T> {}
54
55impl<T> CachePadded<T> {
56    /// Creates a new cache-padded value.
57    #[must_use]
58    pub const fn new(value: T) -> Self {
59        Self { value }
60    }
61
62    /// Returns a reference to the inner value.
63    #[must_use]
64    pub const fn get(&self) -> &T {
65        &self.value
66    }
67
68    /// Returns a mutable reference to the inner value.
69    pub fn get_mut(&mut self) -> &mut T {
70        &mut self.value
71    }
72
73    /// Consumes the wrapper and returns the inner value.
74    #[must_use]
75    pub fn into_inner(self) -> T {
76        self.value
77    }
78}
79
80impl<T> std::ops::Deref for CachePadded<T> {
81    type Target = T;
82
83    fn deref(&self) -> &Self::Target {
84        &self.value
85    }
86}
87
88impl<T> std::ops::DerefMut for CachePadded<T> {
89    fn deref_mut(&mut self) -> &mut Self::Target {
90        &mut self.value
91    }
92}
93
94impl<T: Default> Default for CachePadded<T> {
95    fn default() -> Self {
96        Self::new(T::default())
97    }
98}
99
100impl<T: Clone> Clone for CachePadded<T> {
101    fn clone(&self) -> Self {
102        Self::new(self.value.clone())
103    }
104}
105
106impl<T: std::fmt::Debug> std::fmt::Debug for CachePadded<T> {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        f.debug_struct("CachePadded")
109            .field("value", &self.value)
110            .finish()
111    }
112}
113
114/// A lock-free single-producer single-consumer bounded queue.
115///
116/// This queue is designed for high-performance inter-core communication.
117/// It uses atomic operations with Acquire/Release ordering to ensure
118/// correct synchronization without locks.
119///
120/// # Safety
121///
122/// This queue is only safe when there is exactly one producer thread and
123/// one consumer thread. Multiple producers or consumers will cause data races.
124///
125/// # Example
126///
127/// ```rust
128/// use laminar_core::tpc::SpscQueue;
129///
130/// let queue: SpscQueue<i32> = SpscQueue::new(1024);
131///
132/// // Producer
133/// assert!(queue.push(42).is_ok());
134///
135/// // Consumer
136/// assert_eq!(queue.pop(), Some(42));
137/// ```
138pub struct SpscQueue<T> {
139    /// Ring buffer storage
140    buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
141    /// Head index (consumer reads from here)
142    head: CachePadded<AtomicUsize>,
143    /// Tail index (producer writes here)
144    tail: CachePadded<AtomicUsize>,
145    /// Capacity mask for fast modulo (capacity - 1)
146    capacity_mask: usize,
147}
148
149// SAFETY: SpscQueue can be sent between threads as long as T is Send
150#[allow(unsafe_code)]
151unsafe impl<T: Send> Send for SpscQueue<T> {}
152
153// SAFETY: SpscQueue can be shared between threads (one producer, one consumer)
154// as long as T is Send. The atomic operations ensure correct synchronization.
155#[allow(unsafe_code)]
156unsafe impl<T: Send> Sync for SpscQueue<T> {}
157
158impl<T> SpscQueue<T> {
159    /// Creates a new SPSC queue with the given capacity.
160    ///
161    /// The capacity will be rounded up to the next power of 2 for efficiency.
162    ///
163    /// # Panics
164    ///
165    /// Panics if capacity is 0 or would overflow when rounded to power of 2.
166    #[must_use]
167    pub fn new(capacity: usize) -> Self {
168        assert!(capacity > 0, "capacity must be > 0");
169
170        // Round up to next power of 2
171        let capacity = capacity.next_power_of_two();
172
173        // Allocate the buffer
174        let buffer: Vec<UnsafeCell<MaybeUninit<T>>> = (0..capacity)
175            .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
176            .collect();
177
178        Self {
179            buffer: buffer.into_boxed_slice(),
180            head: CachePadded::new(AtomicUsize::new(0)),
181            tail: CachePadded::new(AtomicUsize::new(0)),
182            capacity_mask: capacity - 1,
183        }
184    }
185
186    /// Returns the capacity of the queue.
187    #[must_use]
188    pub fn capacity(&self) -> usize {
189        self.capacity_mask + 1
190    }
191
192    /// Returns true if the queue is empty.
193    ///
194    /// Note: This is a snapshot and may change immediately after returning.
195    #[must_use]
196    pub fn is_empty(&self) -> bool {
197        let head = self.head.load(Ordering::Acquire);
198        let tail = self.tail.load(Ordering::Acquire);
199        head == tail
200    }
201
202    /// Returns true if the queue is full.
203    ///
204    /// Note: This is a snapshot and may change immediately after returning.
205    #[must_use]
206    pub fn is_full(&self) -> bool {
207        let head = self.head.load(Ordering::Acquire);
208        let tail = self.tail.load(Ordering::Acquire);
209        self.next_index(tail) == head
210    }
211
212    /// Returns the current number of items in the queue.
213    ///
214    /// Note: This is a snapshot and may change immediately after returning.
215    #[must_use]
216    pub fn len(&self) -> usize {
217        let head = self.head.load(Ordering::Acquire);
218        let tail = self.tail.load(Ordering::Acquire);
219        tail.wrapping_sub(head) & self.capacity_mask
220    }
221
222    /// Push an item to the queue.
223    ///
224    /// Returns `Ok(())` if successful, or `Err(item)` if the queue is full.
225    ///
226    /// # Errors
227    ///
228    /// Returns the item back if the queue is full.
229    ///
230    /// # Safety
231    ///
232    /// This method must only be called by the single producer thread.
233    pub fn push(&self, item: T) -> Result<(), T> {
234        let tail = self.tail.load(Ordering::Relaxed);
235        let next_tail = self.next_index(tail);
236
237        // Check if queue is full
238        if next_tail == self.head.load(Ordering::Acquire) {
239            return Err(item);
240        }
241
242        // SAFETY: We have exclusive write access to this slot because:
243        // 1. We are the only producer
244        // 2. The consumer only reads slots where head < tail
245        // 3. We haven't published this slot yet (tail not updated)
246        #[allow(unsafe_code)]
247        unsafe {
248            (*self.buffer[tail].get()).write(item);
249        }
250
251        // Publish the item by updating tail
252        self.tail.store(next_tail, Ordering::Release);
253
254        Ok(())
255    }
256
257    /// Pop an item from the queue.
258    ///
259    /// Returns `Some(item)` if successful, or `None` if the queue is empty.
260    ///
261    /// # Safety
262    ///
263    /// This method must only be called by the single consumer thread.
264    pub fn pop(&self) -> Option<T> {
265        let head = self.head.load(Ordering::Relaxed);
266
267        // Check if queue is empty
268        if head == self.tail.load(Ordering::Acquire) {
269            return None;
270        }
271
272        // SAFETY: We have exclusive read access to this slot because:
273        // 1. We are the only consumer
274        // 2. The producer only writes to slots where tail > head
275        // 3. This slot has been published (we checked tail > head)
276        #[allow(unsafe_code)]
277        let item = unsafe { (*self.buffer[head].get()).assume_init_read() };
278
279        // Consume the item by updating head
280        self.head.store(self.next_index(head), Ordering::Release);
281
282        Some(item)
283    }
284
285    /// Push multiple items to the queue.
286    ///
287    /// Returns the number of items successfully pushed. Items are pushed
288    /// in order, stopping at the first failure.
289    ///
290    /// # Safety
291    ///
292    /// This method must only be called by the single producer thread.
293    pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
294        let mut count = 0;
295        for item in items {
296            if self.push(item).is_err() {
297                break;
298            }
299            count += 1;
300        }
301        count
302    }
303
304    /// Pop multiple items from the queue.
305    ///
306    /// Returns a vector of up to `max_count` items.
307    ///
308    /// # Safety
309    ///
310    /// This method must only be called by the single consumer thread.
311    ///
312    /// # Note
313    ///
314    /// This method allocates memory. For zero-allocation polling, use
315    /// [`pop_batch_into`](Self::pop_batch_into) or [`pop_each`](Self::pop_each) instead.
316    pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
317        let mut items = Vec::with_capacity(max_count.min(self.len()));
318        for _ in 0..max_count {
319            if let Some(item) = self.pop() {
320                items.push(item);
321            } else {
322                break;
323            }
324        }
325        items
326    }
327
328    /// Pop multiple items into a caller-provided buffer (zero-allocation).
329    ///
330    /// Items are written to `buffer` starting at index 0. Returns the number
331    /// of items actually popped (0 if queue empty or buffer full).
332    ///
333    /// # Safety
334    ///
335    /// This method must only be called by the single consumer thread.
336    ///
337    /// After this method returns `n`, the first `n` elements of `buffer`
338    /// are initialized and can be safely read with `assume_init_read()`.
339    ///
340    /// # Example
341    ///
342    /// ```rust
343    /// use laminar_core::tpc::SpscQueue;
344    /// use std::mem::MaybeUninit;
345    ///
346    /// let queue: SpscQueue<i32> = SpscQueue::new(16);
347    /// queue.push(1).unwrap();
348    /// queue.push(2).unwrap();
349    ///
350    /// let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
351    /// let count = queue.pop_batch_into(&mut buffer);
352    ///
353    /// assert_eq!(count, 2);
354    /// // SAFETY: We just initialized these elements
355    /// unsafe {
356    ///     assert_eq!(buffer[0].assume_init(), 1);
357    ///     assert_eq!(buffer[1].assume_init(), 2);
358    /// }
359    /// ```
360    #[inline]
361    pub fn pop_batch_into(&self, buffer: &mut [MaybeUninit<T>]) -> usize {
362        if buffer.is_empty() {
363            return 0;
364        }
365
366        let mut current_head = self.head.load(Ordering::Relaxed);
367        let tail = self.tail.load(Ordering::Acquire);
368
369        // Calculate available items (handle wrap-around)
370        let available = if tail >= current_head {
371            tail - current_head
372        } else {
373            (self.capacity_mask + 1) - current_head + tail
374        };
375
376        let count = available.min(buffer.len());
377
378        if count == 0 {
379            return 0;
380        }
381
382        // Copy items to buffer
383        for slot in buffer.iter_mut().take(count) {
384            // SAFETY: We have exclusive read access to slots between head and tail.
385            // The producer only writes to slots where tail > head, and we've verified
386            // that these slots contain valid data by checking tail.
387            #[allow(unsafe_code)]
388            unsafe {
389                let src = (*self.buffer[current_head].get()).assume_init_read();
390                slot.write(src);
391            }
392
393            // Advance head with wrap-around
394            current_head = self.next_index(current_head);
395        }
396
397        // Update head to release slots
398        self.head.store(current_head, Ordering::Release);
399
400        count
401    }
402
403    /// Pop items and call a callback for each one (zero-allocation).
404    ///
405    /// Processing stops when either:
406    /// - `max_count` items have been processed
407    /// - The queue becomes empty
408    /// - The callback returns `false`
409    ///
410    /// Returns the number of items processed.
411    ///
412    /// # Safety
413    ///
414    /// This method must only be called by the single consumer thread.
415    ///
416    /// # Example
417    ///
418    /// ```rust
419    /// use laminar_core::tpc::SpscQueue;
420    ///
421    /// let queue: SpscQueue<i32> = SpscQueue::new(16);
422    /// queue.push(1).unwrap();
423    /// queue.push(2).unwrap();
424    /// queue.push(3).unwrap();
425    ///
426    /// let mut sum = 0;
427    /// let count = queue.pop_each(10, |item| {
428    ///     sum += item;
429    ///     true // Continue processing
430    /// });
431    ///
432    /// assert_eq!(count, 3);
433    /// assert_eq!(sum, 6);
434    /// ```
435    #[inline]
436    pub fn pop_each<F>(&self, max_count: usize, mut f: F) -> usize
437    where
438        F: FnMut(T) -> bool,
439    {
440        if max_count == 0 {
441            return 0;
442        }
443
444        let mut current_head = self.head.load(Ordering::Relaxed);
445        let tail = self.tail.load(Ordering::Acquire);
446
447        // Calculate available items (handle wrap-around)
448        let available = if tail >= current_head {
449            tail - current_head
450        } else {
451            (self.capacity_mask + 1) - current_head + tail
452        };
453
454        let to_pop = available.min(max_count);
455
456        if to_pop == 0 {
457            return 0;
458        }
459
460        let mut popped = 0;
461        for _ in 0..to_pop {
462            // SAFETY: We have exclusive read access to slots between head and tail.
463            #[allow(unsafe_code)]
464            let item = unsafe { (*self.buffer[current_head].get()).assume_init_read() };
465
466            popped += 1;
467
468            // Advance head with wrap-around
469            current_head = self.next_index(current_head);
470
471            // Call the callback; stop if it returns false
472            if !f(item) {
473                break;
474            }
475        }
476
477        // Update head to release processed slots
478        if popped > 0 {
479            self.head.store(current_head, Ordering::Release);
480        }
481
482        popped
483    }
484
485    /// Peek at the next item without removing it.
486    ///
487    /// Returns `None` if the queue is empty.
488    ///
489    /// # Safety
490    ///
491    /// This method must only be called by the single consumer thread.
492    pub fn peek(&self) -> Option<&T> {
493        let head = self.head.load(Ordering::Relaxed);
494
495        if head == self.tail.load(Ordering::Acquire) {
496            return None;
497        }
498
499        // SAFETY: Same reasoning as pop() - we have exclusive read access
500        #[allow(unsafe_code)]
501        unsafe {
502            Some((*self.buffer[head].get()).assume_init_ref())
503        }
504    }
505
506    /// Calculate the next index with wrap-around.
507    #[inline]
508    const fn next_index(&self, index: usize) -> usize {
509        (index + 1) & self.capacity_mask
510    }
511}
512
513impl<T> Drop for SpscQueue<T> {
514    fn drop(&mut self) {
515        // Drop any remaining items in the queue
516        while self.pop().is_some() {}
517    }
518}
519
520impl<T: std::fmt::Debug> std::fmt::Debug for SpscQueue<T> {
521    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522        f.debug_struct("SpscQueue")
523            .field("capacity", &self.capacity())
524            .field("len", &self.len())
525            .finish()
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use std::sync::Arc;
533    use std::thread;
534
535    #[test]
536    fn test_cache_padded_size() {
537        // Verify CachePadded provides 128-byte alignment (two cache lines)
538        // to prevent Intel L2 spatial prefetcher false sharing.
539        assert!(std::mem::align_of::<CachePadded<AtomicUsize>>() == 128);
540    }
541
542    #[test]
543    fn test_cache_padded_operations() {
544        let padded = CachePadded::new(42u32);
545        assert_eq!(*padded, 42);
546        assert_eq!(*padded.get(), 42);
547
548        let mut padded = CachePadded::new(42u32);
549        *padded.get_mut() = 100;
550        assert_eq!(*padded, 100);
551
552        let inner = padded.into_inner();
553        assert_eq!(inner, 100);
554    }
555
556    #[test]
557    fn test_cache_padded_default() {
558        let padded: CachePadded<u32> = CachePadded::default();
559        assert_eq!(*padded, 0);
560    }
561
562    #[test]
563    fn test_cache_padded_clone() {
564        let padded = CachePadded::new(42u32);
565        let cloned = padded.clone();
566        assert_eq!(*cloned, 42);
567    }
568
569    #[test]
570    fn test_new_queue() {
571        let queue: SpscQueue<i32> = SpscQueue::new(100);
572        // Should round up to 128
573        assert_eq!(queue.capacity(), 128);
574        assert!(queue.is_empty());
575        assert!(!queue.is_full());
576        assert_eq!(queue.len(), 0);
577    }
578
579    #[test]
580    fn test_push_pop() {
581        let queue: SpscQueue<i32> = SpscQueue::new(4);
582
583        assert!(queue.push(1).is_ok());
584        assert!(queue.push(2).is_ok());
585        assert!(queue.push(3).is_ok());
586        // Queue of capacity 4 can only hold 3 items (one slot reserved)
587        assert!(queue.is_full());
588        assert!(queue.push(4).is_err());
589
590        assert_eq!(queue.pop(), Some(1));
591        assert_eq!(queue.pop(), Some(2));
592        assert_eq!(queue.pop(), Some(3));
593        assert_eq!(queue.pop(), None);
594        assert!(queue.is_empty());
595    }
596
597    #[test]
598    fn test_fifo_order() {
599        let queue: SpscQueue<i32> = SpscQueue::new(16);
600
601        for i in 0..10 {
602            assert!(queue.push(i).is_ok());
603        }
604
605        for i in 0..10 {
606            assert_eq!(queue.pop(), Some(i));
607        }
608    }
609
610    #[test]
611    fn test_wrap_around() {
612        let queue: SpscQueue<i32> = SpscQueue::new(4);
613
614        // Fill and empty multiple times to test wrap-around
615        for iteration in 0..5 {
616            for i in 0..3 {
617                assert!(queue.push(iteration * 10 + i).is_ok());
618            }
619            for i in 0..3 {
620                assert_eq!(queue.pop(), Some(iteration * 10 + i));
621            }
622        }
623    }
624
625    #[test]
626    fn test_peek() {
627        let queue: SpscQueue<i32> = SpscQueue::new(4);
628
629        assert!(queue.peek().is_none());
630
631        queue.push(42).unwrap();
632        assert_eq!(queue.peek(), Some(&42));
633        assert_eq!(queue.peek(), Some(&42)); // Still there
634
635        assert_eq!(queue.pop(), Some(42));
636        assert!(queue.peek().is_none());
637    }
638
639    #[test]
640    fn test_push_batch() {
641        let queue: SpscQueue<i32> = SpscQueue::new(8);
642
643        let pushed = queue.push_batch(vec![1, 2, 3, 4, 5]);
644        assert_eq!(pushed, 5);
645        assert_eq!(queue.len(), 5);
646
647        // Try to push more than capacity
648        let pushed = queue.push_batch(vec![6, 7, 8, 9, 10]);
649        assert_eq!(pushed, 2); // Only 2 more fit (7 slots max, 5 used)
650    }
651
652    #[test]
653    fn test_pop_batch() {
654        let queue: SpscQueue<i32> = SpscQueue::new(8);
655
656        queue.push_batch(vec![1, 2, 3, 4, 5]);
657
658        let items = queue.pop_batch(3);
659        assert_eq!(items, vec![1, 2, 3]);
660        assert_eq!(queue.len(), 2);
661
662        let items = queue.pop_batch(10); // Request more than available
663        assert_eq!(items, vec![4, 5]);
664        assert!(queue.is_empty());
665    }
666
667    #[test]
668    fn test_concurrent_producer_consumer() {
669        const ITEMS: i32 = 10_000;
670        let queue = Arc::new(SpscQueue::<i32>::new(1024));
671        let queue_producer = Arc::clone(&queue);
672        let queue_consumer = Arc::clone(&queue);
673
674        // Producer thread
675        let producer = thread::spawn(move || {
676            for i in 0..ITEMS {
677                while queue_producer.push(i).is_err() {
678                    thread::yield_now();
679                }
680            }
681        });
682
683        // Consumer thread
684        let consumer = thread::spawn(move || {
685            let mut received = Vec::with_capacity(ITEMS as usize);
686            while received.len() < ITEMS as usize {
687                if let Some(item) = queue_consumer.pop() {
688                    received.push(item);
689                } else {
690                    thread::yield_now();
691                }
692            }
693            received
694        });
695
696        producer.join().unwrap();
697        let received = consumer.join().unwrap();
698
699        // Verify all items received in order
700        assert_eq!(received.len(), ITEMS as usize);
701        for (i, &item) in received.iter().enumerate() {
702            assert_eq!(
703                item,
704                i32::try_from(i).unwrap(),
705                "Item out of order at index {i}"
706            );
707        }
708    }
709
710    #[derive(Debug)]
711    struct DropCounter(Arc<AtomicUsize>);
712
713    impl Drop for DropCounter {
714        fn drop(&mut self) {
715            self.0.fetch_add(1, Ordering::SeqCst);
716        }
717    }
718
719    #[test]
720    fn test_drop() {
721        use std::sync::atomic::AtomicUsize;
722        use std::sync::Arc;
723
724        let drop_count = Arc::new(AtomicUsize::new(0));
725
726        {
727            let queue: SpscQueue<DropCounter> = SpscQueue::new(8);
728            for _ in 0..5 {
729                queue.push(DropCounter(Arc::clone(&drop_count))).unwrap();
730            }
731            // Pop 2 items
732            queue.pop();
733            queue.pop();
734            // Queue drops with 3 items remaining
735        }
736
737        // All 5 items should be dropped (2 popped + 3 on drop)
738        assert_eq!(drop_count.load(Ordering::SeqCst), 5);
739    }
740
741    #[test]
742    fn test_debug() {
743        let queue: SpscQueue<i32> = SpscQueue::new(8);
744        queue.push(1).unwrap();
745        queue.push(2).unwrap();
746
747        let debug_str = format!("{queue:?}");
748        assert!(debug_str.contains("SpscQueue"));
749        assert!(debug_str.contains("capacity"));
750        assert!(debug_str.contains("len"));
751    }
752
753    #[test]
754    #[should_panic(expected = "capacity must be > 0")]
755    fn test_zero_capacity_panics() {
756        let _: SpscQueue<i32> = SpscQueue::new(0);
757    }
758
759    #[test]
760    fn test_pop_batch_into() {
761        let queue: SpscQueue<i32> = SpscQueue::new(16);
762
763        // Push some items
764        queue.push(1).unwrap();
765        queue.push(2).unwrap();
766        queue.push(3).unwrap();
767
768        // Pop into buffer
769        let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
770        let count = queue.pop_batch_into(&mut buffer);
771
772        assert_eq!(count, 3);
773
774        // SAFETY: We just initialized these elements
775        #[allow(unsafe_code)]
776        unsafe {
777            assert_eq!(buffer[0].assume_init(), 1);
778            assert_eq!(buffer[1].assume_init(), 2);
779            assert_eq!(buffer[2].assume_init(), 3);
780        }
781
782        assert!(queue.is_empty());
783    }
784
785    #[test]
786    fn test_pop_batch_into_partial() {
787        let queue: SpscQueue<i32> = SpscQueue::new(16);
788
789        // Push 5 items
790        for i in 0..5 {
791            queue.push(i).unwrap();
792        }
793
794        // Pop only 3 (buffer smaller than available)
795        let mut buffer: [MaybeUninit<i32>; 3] = [MaybeUninit::uninit(); 3];
796        let count = queue.pop_batch_into(&mut buffer);
797
798        assert_eq!(count, 3);
799        assert_eq!(queue.len(), 2); // 2 items remaining
800
801        // SAFETY: We just initialized these elements
802        #[allow(unsafe_code)]
803        unsafe {
804            assert_eq!(buffer[0].assume_init(), 0);
805            assert_eq!(buffer[1].assume_init(), 1);
806            assert_eq!(buffer[2].assume_init(), 2);
807        }
808    }
809
810    #[test]
811    fn test_pop_batch_into_empty() {
812        let queue: SpscQueue<i32> = SpscQueue::new(16);
813
814        let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
815        let count = queue.pop_batch_into(&mut buffer);
816
817        assert_eq!(count, 0);
818    }
819
820    #[test]
821    fn test_pop_batch_into_empty_buffer() {
822        let queue: SpscQueue<i32> = SpscQueue::new(16);
823        queue.push(1).unwrap();
824
825        let mut buffer: [MaybeUninit<i32>; 0] = [];
826        let count = queue.pop_batch_into(&mut buffer);
827
828        assert_eq!(count, 0);
829        assert_eq!(queue.len(), 1); // Item still in queue
830    }
831
832    #[test]
833    fn test_pop_each() {
834        let queue: SpscQueue<i32> = SpscQueue::new(16);
835
836        queue.push(1).unwrap();
837        queue.push(2).unwrap();
838        queue.push(3).unwrap();
839
840        let mut sum = 0;
841        let count = queue.pop_each(10, |item| {
842            sum += item;
843            true
844        });
845
846        assert_eq!(count, 3);
847        assert_eq!(sum, 6);
848        assert!(queue.is_empty());
849    }
850
851    #[test]
852    fn test_pop_each_early_stop() {
853        let queue: SpscQueue<i32> = SpscQueue::new(16);
854
855        queue.push(1).unwrap();
856        queue.push(2).unwrap();
857        queue.push(3).unwrap();
858        queue.push(4).unwrap();
859        queue.push(5).unwrap();
860
861        let mut items = Vec::new();
862        let count = queue.pop_each(10, |item| {
863            items.push(item);
864            item < 3 // Stop after item 3
865        });
866
867        assert_eq!(count, 3); // Processed 1, 2, 3
868        assert_eq!(items, vec![1, 2, 3]);
869        assert_eq!(queue.len(), 2); // 4, 5 remaining
870    }
871
872    #[test]
873    fn test_pop_each_max_count() {
874        let queue: SpscQueue<i32> = SpscQueue::new(16);
875
876        for i in 0..10 {
877            queue.push(i).unwrap();
878        }
879
880        let mut count_processed = 0;
881        let count = queue.pop_each(5, |_| {
882            count_processed += 1;
883            true
884        });
885
886        assert_eq!(count, 5);
887        assert_eq!(count_processed, 5);
888        assert_eq!(queue.len(), 5); // 5 remaining
889    }
890
891    #[test]
892    fn test_pop_each_empty() {
893        let queue: SpscQueue<i32> = SpscQueue::new(16);
894
895        let mut called = false;
896        let count = queue.pop_each(10, |_| {
897            called = true;
898            true
899        });
900
901        assert_eq!(count, 0);
902        assert!(!called);
903    }
904
905    #[test]
906    fn test_pop_each_zero_max() {
907        let queue: SpscQueue<i32> = SpscQueue::new(16);
908        queue.push(1).unwrap();
909
910        let count = queue.pop_each(0, |_| true);
911
912        assert_eq!(count, 0);
913        assert_eq!(queue.len(), 1); // Item still in queue
914    }
915
916    #[test]
917    fn test_pop_batch_into_wrap_around() {
918        let queue: SpscQueue<i32> = SpscQueue::new(4); // Capacity 4
919
920        // Fill and empty to advance indices
921        for _ in 0..3 {
922            for i in 0..3 {
923                queue.push(i).unwrap();
924            }
925            for _ in 0..3 {
926                queue.pop();
927            }
928        }
929
930        // Now indices are wrapped, push new items
931        queue.push(10).unwrap();
932        queue.push(11).unwrap();
933
934        let mut buffer: [MaybeUninit<i32>; 4] = [MaybeUninit::uninit(); 4];
935        let count = queue.pop_batch_into(&mut buffer);
936
937        assert_eq!(count, 2);
938
939        #[allow(unsafe_code)]
940        unsafe {
941            assert_eq!(buffer[0].assume_init(), 10);
942            assert_eq!(buffer[1].assume_init(), 11);
943        }
944    }
945
946    #[test]
947    fn test_pop_each_wrap_around() {
948        let queue: SpscQueue<i32> = SpscQueue::new(4);
949
950        // Fill and empty to advance indices
951        for _ in 0..3 {
952            for i in 0..3 {
953                queue.push(i).unwrap();
954            }
955            let _ = queue.pop_batch(3);
956        }
957
958        // Now push with wrapped indices
959        queue.push(100).unwrap();
960        queue.push(200).unwrap();
961
962        let mut items = Vec::new();
963        queue.pop_each(10, |item| {
964            items.push(item);
965            true
966        });
967
968        assert_eq!(items, vec![100, 200]);
969    }
970}