1use std::cell::UnsafeCell;
18use std::mem::MaybeUninit;
19use std::sync::atomic::{AtomicUsize, Ordering};
20
21#[repr(C, align(128))]
43pub struct CachePadded<T> {
44 value: T,
45}
46
47#[allow(unsafe_code)]
49unsafe impl<T: Send> Send for CachePadded<T> {}
50
51#[allow(unsafe_code)]
53unsafe impl<T: Sync> Sync for CachePadded<T> {}
54
55impl<T> CachePadded<T> {
56 #[must_use]
58 pub const fn new(value: T) -> Self {
59 Self { value }
60 }
61
62 #[must_use]
64 pub const fn get(&self) -> &T {
65 &self.value
66 }
67
68 pub fn get_mut(&mut self) -> &mut T {
70 &mut self.value
71 }
72
73 #[must_use]
75 pub fn into_inner(self) -> T {
76 self.value
77 }
78}
79
80impl<T> std::ops::Deref for CachePadded<T> {
81 type Target = T;
82
83 fn deref(&self) -> &Self::Target {
84 &self.value
85 }
86}
87
88impl<T> std::ops::DerefMut for CachePadded<T> {
89 fn deref_mut(&mut self) -> &mut Self::Target {
90 &mut self.value
91 }
92}
93
94impl<T: Default> Default for CachePadded<T> {
95 fn default() -> Self {
96 Self::new(T::default())
97 }
98}
99
100impl<T: Clone> Clone for CachePadded<T> {
101 fn clone(&self) -> Self {
102 Self::new(self.value.clone())
103 }
104}
105
106impl<T: std::fmt::Debug> std::fmt::Debug for CachePadded<T> {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 f.debug_struct("CachePadded")
109 .field("value", &self.value)
110 .finish()
111 }
112}
113
114pub struct SpscQueue<T> {
139 buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
141 head: CachePadded<AtomicUsize>,
143 tail: CachePadded<AtomicUsize>,
145 capacity_mask: usize,
147}
148
149#[allow(unsafe_code)]
151unsafe impl<T: Send> Send for SpscQueue<T> {}
152
153#[allow(unsafe_code)]
156unsafe impl<T: Send> Sync for SpscQueue<T> {}
157
158impl<T> SpscQueue<T> {
159 #[must_use]
167 pub fn new(capacity: usize) -> Self {
168 assert!(capacity > 0, "capacity must be > 0");
169
170 let capacity = capacity.next_power_of_two();
172
173 let buffer: Vec<UnsafeCell<MaybeUninit<T>>> = (0..capacity)
175 .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
176 .collect();
177
178 Self {
179 buffer: buffer.into_boxed_slice(),
180 head: CachePadded::new(AtomicUsize::new(0)),
181 tail: CachePadded::new(AtomicUsize::new(0)),
182 capacity_mask: capacity - 1,
183 }
184 }
185
186 #[must_use]
188 pub fn capacity(&self) -> usize {
189 self.capacity_mask + 1
190 }
191
192 #[must_use]
196 pub fn is_empty(&self) -> bool {
197 let head = self.head.load(Ordering::Acquire);
198 let tail = self.tail.load(Ordering::Acquire);
199 head == tail
200 }
201
202 #[must_use]
206 pub fn is_full(&self) -> bool {
207 let head = self.head.load(Ordering::Acquire);
208 let tail = self.tail.load(Ordering::Acquire);
209 self.next_index(tail) == head
210 }
211
212 #[must_use]
216 pub fn len(&self) -> usize {
217 let head = self.head.load(Ordering::Acquire);
218 let tail = self.tail.load(Ordering::Acquire);
219 tail.wrapping_sub(head) & self.capacity_mask
220 }
221
222 pub fn push(&self, item: T) -> Result<(), T> {
234 let tail = self.tail.load(Ordering::Relaxed);
235 let next_tail = self.next_index(tail);
236
237 if next_tail == self.head.load(Ordering::Acquire) {
239 return Err(item);
240 }
241
242 #[allow(unsafe_code)]
247 unsafe {
248 (*self.buffer[tail].get()).write(item);
249 }
250
251 self.tail.store(next_tail, Ordering::Release);
253
254 Ok(())
255 }
256
257 pub fn pop(&self) -> Option<T> {
265 let head = self.head.load(Ordering::Relaxed);
266
267 if head == self.tail.load(Ordering::Acquire) {
269 return None;
270 }
271
272 #[allow(unsafe_code)]
277 let item = unsafe { (*self.buffer[head].get()).assume_init_read() };
278
279 self.head.store(self.next_index(head), Ordering::Release);
281
282 Some(item)
283 }
284
285 pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
294 let mut count = 0;
295 for item in items {
296 if self.push(item).is_err() {
297 break;
298 }
299 count += 1;
300 }
301 count
302 }
303
304 pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
317 let mut items = Vec::with_capacity(max_count.min(self.len()));
318 for _ in 0..max_count {
319 if let Some(item) = self.pop() {
320 items.push(item);
321 } else {
322 break;
323 }
324 }
325 items
326 }
327
328 #[inline]
361 pub fn pop_batch_into(&self, buffer: &mut [MaybeUninit<T>]) -> usize {
362 if buffer.is_empty() {
363 return 0;
364 }
365
366 let mut current_head = self.head.load(Ordering::Relaxed);
367 let tail = self.tail.load(Ordering::Acquire);
368
369 let available = if tail >= current_head {
371 tail - current_head
372 } else {
373 (self.capacity_mask + 1) - current_head + tail
374 };
375
376 let count = available.min(buffer.len());
377
378 if count == 0 {
379 return 0;
380 }
381
382 for slot in buffer.iter_mut().take(count) {
384 #[allow(unsafe_code)]
388 unsafe {
389 let src = (*self.buffer[current_head].get()).assume_init_read();
390 slot.write(src);
391 }
392
393 current_head = self.next_index(current_head);
395 }
396
397 self.head.store(current_head, Ordering::Release);
399
400 count
401 }
402
403 #[inline]
436 pub fn pop_each<F>(&self, max_count: usize, mut f: F) -> usize
437 where
438 F: FnMut(T) -> bool,
439 {
440 if max_count == 0 {
441 return 0;
442 }
443
444 let mut current_head = self.head.load(Ordering::Relaxed);
445 let tail = self.tail.load(Ordering::Acquire);
446
447 let available = if tail >= current_head {
449 tail - current_head
450 } else {
451 (self.capacity_mask + 1) - current_head + tail
452 };
453
454 let to_pop = available.min(max_count);
455
456 if to_pop == 0 {
457 return 0;
458 }
459
460 let mut popped = 0;
461 for _ in 0..to_pop {
462 #[allow(unsafe_code)]
464 let item = unsafe { (*self.buffer[current_head].get()).assume_init_read() };
465
466 popped += 1;
467
468 current_head = self.next_index(current_head);
470
471 if !f(item) {
473 break;
474 }
475 }
476
477 if popped > 0 {
479 self.head.store(current_head, Ordering::Release);
480 }
481
482 popped
483 }
484
485 pub fn peek(&self) -> Option<&T> {
493 let head = self.head.load(Ordering::Relaxed);
494
495 if head == self.tail.load(Ordering::Acquire) {
496 return None;
497 }
498
499 #[allow(unsafe_code)]
501 unsafe {
502 Some((*self.buffer[head].get()).assume_init_ref())
503 }
504 }
505
506 #[inline]
508 const fn next_index(&self, index: usize) -> usize {
509 (index + 1) & self.capacity_mask
510 }
511}
512
513impl<T> Drop for SpscQueue<T> {
514 fn drop(&mut self) {
515 while self.pop().is_some() {}
517 }
518}
519
520impl<T: std::fmt::Debug> std::fmt::Debug for SpscQueue<T> {
521 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
522 f.debug_struct("SpscQueue")
523 .field("capacity", &self.capacity())
524 .field("len", &self.len())
525 .finish()
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532 use std::sync::Arc;
533 use std::thread;
534
535 #[test]
536 fn test_cache_padded_size() {
537 assert!(std::mem::align_of::<CachePadded<AtomicUsize>>() == 128);
540 }
541
542 #[test]
543 fn test_cache_padded_operations() {
544 let padded = CachePadded::new(42u32);
545 assert_eq!(*padded, 42);
546 assert_eq!(*padded.get(), 42);
547
548 let mut padded = CachePadded::new(42u32);
549 *padded.get_mut() = 100;
550 assert_eq!(*padded, 100);
551
552 let inner = padded.into_inner();
553 assert_eq!(inner, 100);
554 }
555
556 #[test]
557 fn test_cache_padded_default() {
558 let padded: CachePadded<u32> = CachePadded::default();
559 assert_eq!(*padded, 0);
560 }
561
562 #[test]
563 fn test_cache_padded_clone() {
564 let padded = CachePadded::new(42u32);
565 let cloned = padded.clone();
566 assert_eq!(*cloned, 42);
567 }
568
569 #[test]
570 fn test_new_queue() {
571 let queue: SpscQueue<i32> = SpscQueue::new(100);
572 assert_eq!(queue.capacity(), 128);
574 assert!(queue.is_empty());
575 assert!(!queue.is_full());
576 assert_eq!(queue.len(), 0);
577 }
578
579 #[test]
580 fn test_push_pop() {
581 let queue: SpscQueue<i32> = SpscQueue::new(4);
582
583 assert!(queue.push(1).is_ok());
584 assert!(queue.push(2).is_ok());
585 assert!(queue.push(3).is_ok());
586 assert!(queue.is_full());
588 assert!(queue.push(4).is_err());
589
590 assert_eq!(queue.pop(), Some(1));
591 assert_eq!(queue.pop(), Some(2));
592 assert_eq!(queue.pop(), Some(3));
593 assert_eq!(queue.pop(), None);
594 assert!(queue.is_empty());
595 }
596
597 #[test]
598 fn test_fifo_order() {
599 let queue: SpscQueue<i32> = SpscQueue::new(16);
600
601 for i in 0..10 {
602 assert!(queue.push(i).is_ok());
603 }
604
605 for i in 0..10 {
606 assert_eq!(queue.pop(), Some(i));
607 }
608 }
609
610 #[test]
611 fn test_wrap_around() {
612 let queue: SpscQueue<i32> = SpscQueue::new(4);
613
614 for iteration in 0..5 {
616 for i in 0..3 {
617 assert!(queue.push(iteration * 10 + i).is_ok());
618 }
619 for i in 0..3 {
620 assert_eq!(queue.pop(), Some(iteration * 10 + i));
621 }
622 }
623 }
624
625 #[test]
626 fn test_peek() {
627 let queue: SpscQueue<i32> = SpscQueue::new(4);
628
629 assert!(queue.peek().is_none());
630
631 queue.push(42).unwrap();
632 assert_eq!(queue.peek(), Some(&42));
633 assert_eq!(queue.peek(), Some(&42)); assert_eq!(queue.pop(), Some(42));
636 assert!(queue.peek().is_none());
637 }
638
639 #[test]
640 fn test_push_batch() {
641 let queue: SpscQueue<i32> = SpscQueue::new(8);
642
643 let pushed = queue.push_batch(vec![1, 2, 3, 4, 5]);
644 assert_eq!(pushed, 5);
645 assert_eq!(queue.len(), 5);
646
647 let pushed = queue.push_batch(vec![6, 7, 8, 9, 10]);
649 assert_eq!(pushed, 2); }
651
652 #[test]
653 fn test_pop_batch() {
654 let queue: SpscQueue<i32> = SpscQueue::new(8);
655
656 queue.push_batch(vec![1, 2, 3, 4, 5]);
657
658 let items = queue.pop_batch(3);
659 assert_eq!(items, vec![1, 2, 3]);
660 assert_eq!(queue.len(), 2);
661
662 let items = queue.pop_batch(10); assert_eq!(items, vec![4, 5]);
664 assert!(queue.is_empty());
665 }
666
667 #[test]
668 fn test_concurrent_producer_consumer() {
669 const ITEMS: i32 = 10_000;
670 let queue = Arc::new(SpscQueue::<i32>::new(1024));
671 let queue_producer = Arc::clone(&queue);
672 let queue_consumer = Arc::clone(&queue);
673
674 let producer = thread::spawn(move || {
676 for i in 0..ITEMS {
677 while queue_producer.push(i).is_err() {
678 thread::yield_now();
679 }
680 }
681 });
682
683 let consumer = thread::spawn(move || {
685 let mut received = Vec::with_capacity(ITEMS as usize);
686 while received.len() < ITEMS as usize {
687 if let Some(item) = queue_consumer.pop() {
688 received.push(item);
689 } else {
690 thread::yield_now();
691 }
692 }
693 received
694 });
695
696 producer.join().unwrap();
697 let received = consumer.join().unwrap();
698
699 assert_eq!(received.len(), ITEMS as usize);
701 for (i, &item) in received.iter().enumerate() {
702 assert_eq!(
703 item,
704 i32::try_from(i).unwrap(),
705 "Item out of order at index {i}"
706 );
707 }
708 }
709
710 #[derive(Debug)]
711 struct DropCounter(Arc<AtomicUsize>);
712
713 impl Drop for DropCounter {
714 fn drop(&mut self) {
715 self.0.fetch_add(1, Ordering::SeqCst);
716 }
717 }
718
719 #[test]
720 fn test_drop() {
721 use std::sync::atomic::AtomicUsize;
722 use std::sync::Arc;
723
724 let drop_count = Arc::new(AtomicUsize::new(0));
725
726 {
727 let queue: SpscQueue<DropCounter> = SpscQueue::new(8);
728 for _ in 0..5 {
729 queue.push(DropCounter(Arc::clone(&drop_count))).unwrap();
730 }
731 queue.pop();
733 queue.pop();
734 }
736
737 assert_eq!(drop_count.load(Ordering::SeqCst), 5);
739 }
740
741 #[test]
742 fn test_debug() {
743 let queue: SpscQueue<i32> = SpscQueue::new(8);
744 queue.push(1).unwrap();
745 queue.push(2).unwrap();
746
747 let debug_str = format!("{queue:?}");
748 assert!(debug_str.contains("SpscQueue"));
749 assert!(debug_str.contains("capacity"));
750 assert!(debug_str.contains("len"));
751 }
752
753 #[test]
754 #[should_panic(expected = "capacity must be > 0")]
755 fn test_zero_capacity_panics() {
756 let _: SpscQueue<i32> = SpscQueue::new(0);
757 }
758
759 #[test]
760 fn test_pop_batch_into() {
761 let queue: SpscQueue<i32> = SpscQueue::new(16);
762
763 queue.push(1).unwrap();
765 queue.push(2).unwrap();
766 queue.push(3).unwrap();
767
768 let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
770 let count = queue.pop_batch_into(&mut buffer);
771
772 assert_eq!(count, 3);
773
774 #[allow(unsafe_code)]
776 unsafe {
777 assert_eq!(buffer[0].assume_init(), 1);
778 assert_eq!(buffer[1].assume_init(), 2);
779 assert_eq!(buffer[2].assume_init(), 3);
780 }
781
782 assert!(queue.is_empty());
783 }
784
785 #[test]
786 fn test_pop_batch_into_partial() {
787 let queue: SpscQueue<i32> = SpscQueue::new(16);
788
789 for i in 0..5 {
791 queue.push(i).unwrap();
792 }
793
794 let mut buffer: [MaybeUninit<i32>; 3] = [MaybeUninit::uninit(); 3];
796 let count = queue.pop_batch_into(&mut buffer);
797
798 assert_eq!(count, 3);
799 assert_eq!(queue.len(), 2); #[allow(unsafe_code)]
803 unsafe {
804 assert_eq!(buffer[0].assume_init(), 0);
805 assert_eq!(buffer[1].assume_init(), 1);
806 assert_eq!(buffer[2].assume_init(), 2);
807 }
808 }
809
810 #[test]
811 fn test_pop_batch_into_empty() {
812 let queue: SpscQueue<i32> = SpscQueue::new(16);
813
814 let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
815 let count = queue.pop_batch_into(&mut buffer);
816
817 assert_eq!(count, 0);
818 }
819
820 #[test]
821 fn test_pop_batch_into_empty_buffer() {
822 let queue: SpscQueue<i32> = SpscQueue::new(16);
823 queue.push(1).unwrap();
824
825 let mut buffer: [MaybeUninit<i32>; 0] = [];
826 let count = queue.pop_batch_into(&mut buffer);
827
828 assert_eq!(count, 0);
829 assert_eq!(queue.len(), 1); }
831
832 #[test]
833 fn test_pop_each() {
834 let queue: SpscQueue<i32> = SpscQueue::new(16);
835
836 queue.push(1).unwrap();
837 queue.push(2).unwrap();
838 queue.push(3).unwrap();
839
840 let mut sum = 0;
841 let count = queue.pop_each(10, |item| {
842 sum += item;
843 true
844 });
845
846 assert_eq!(count, 3);
847 assert_eq!(sum, 6);
848 assert!(queue.is_empty());
849 }
850
851 #[test]
852 fn test_pop_each_early_stop() {
853 let queue: SpscQueue<i32> = SpscQueue::new(16);
854
855 queue.push(1).unwrap();
856 queue.push(2).unwrap();
857 queue.push(3).unwrap();
858 queue.push(4).unwrap();
859 queue.push(5).unwrap();
860
861 let mut items = Vec::new();
862 let count = queue.pop_each(10, |item| {
863 items.push(item);
864 item < 3 });
866
867 assert_eq!(count, 3); assert_eq!(items, vec![1, 2, 3]);
869 assert_eq!(queue.len(), 2); }
871
872 #[test]
873 fn test_pop_each_max_count() {
874 let queue: SpscQueue<i32> = SpscQueue::new(16);
875
876 for i in 0..10 {
877 queue.push(i).unwrap();
878 }
879
880 let mut count_processed = 0;
881 let count = queue.pop_each(5, |_| {
882 count_processed += 1;
883 true
884 });
885
886 assert_eq!(count, 5);
887 assert_eq!(count_processed, 5);
888 assert_eq!(queue.len(), 5); }
890
891 #[test]
892 fn test_pop_each_empty() {
893 let queue: SpscQueue<i32> = SpscQueue::new(16);
894
895 let mut called = false;
896 let count = queue.pop_each(10, |_| {
897 called = true;
898 true
899 });
900
901 assert_eq!(count, 0);
902 assert!(!called);
903 }
904
905 #[test]
906 fn test_pop_each_zero_max() {
907 let queue: SpscQueue<i32> = SpscQueue::new(16);
908 queue.push(1).unwrap();
909
910 let count = queue.pop_each(0, |_| true);
911
912 assert_eq!(count, 0);
913 assert_eq!(queue.len(), 1); }
915
916 #[test]
917 fn test_pop_batch_into_wrap_around() {
918 let queue: SpscQueue<i32> = SpscQueue::new(4); for _ in 0..3 {
922 for i in 0..3 {
923 queue.push(i).unwrap();
924 }
925 for _ in 0..3 {
926 queue.pop();
927 }
928 }
929
930 queue.push(10).unwrap();
932 queue.push(11).unwrap();
933
934 let mut buffer: [MaybeUninit<i32>; 4] = [MaybeUninit::uninit(); 4];
935 let count = queue.pop_batch_into(&mut buffer);
936
937 assert_eq!(count, 2);
938
939 #[allow(unsafe_code)]
940 unsafe {
941 assert_eq!(buffer[0].assume_init(), 10);
942 assert_eq!(buffer[1].assume_init(), 11);
943 }
944 }
945
946 #[test]
947 fn test_pop_each_wrap_around() {
948 let queue: SpscQueue<i32> = SpscQueue::new(4);
949
950 for _ in 0..3 {
952 for i in 0..3 {
953 queue.push(i).unwrap();
954 }
955 let _ = queue.pop_batch(3);
956 }
957
958 queue.push(100).unwrap();
960 queue.push(200).unwrap();
961
962 let mut items = Vec::new();
963 queue.pop_each(10, |item| {
964 items.push(item);
965 true
966 });
967
968 assert_eq!(items, vec![100, 200]);
969 }
970}