use core::{cell::UnsafeCell, fmt, hash, marker::PhantomData, mem::MaybeUninit, ptr};
use generic_array::{ArrayLength, GenericArray};
use hash32;
use crate::sealed::spsc as sealed;
pub use split::{Consumer, Producer};
mod split;
pub struct MultiCore;
pub struct SingleCore;
pub(crate) struct Atomic<U, C> {
v: UnsafeCell<U>,
c: PhantomData<C>,
}
impl<U, C> Atomic<U, C> {
pub(crate) const fn new(v: U) -> Self {
Atomic {
v: UnsafeCell::new(v),
c: PhantomData,
}
}
}
impl<U, C> Atomic<U, C>
where
U: sealed::Uxx,
C: sealed::XCore,
{
fn get(&self) -> &U {
unsafe { &*self.v.get() }
}
fn get_mut(&mut self) -> &mut U {
unsafe { &mut *self.v.get() }
}
fn load_acquire(&self) -> U {
unsafe { U::load_acquire::<C>(self.v.get()) }
}
fn load_relaxed(&self) -> U {
U::load_relaxed(self.v.get())
}
fn store_release(&self, val: U) {
unsafe { U::store_release::<C>(self.v.get(), val) }
}
}
pub struct Queue<T, N, U = usize, C = MultiCore>(
#[doc(hidden)] pub crate::i::Queue<GenericArray<T, N>, U, C>,
)
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore;
impl<T, N, U, C> Queue<T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
pub fn capacity(&self) -> U {
U::saturate(N::to_usize())
}
pub fn is_empty(&self) -> bool {
self.len_usize() == 0
}
pub fn iter(&self) -> Iter<'_, T, N, U, C> {
Iter {
rb: self,
index: 0,
len: self.len_usize(),
}
}
pub fn iter_mut(&mut self) -> IterMut<'_, T, N, U, C> {
let len = self.len_usize();
IterMut {
rb: self,
index: 0,
len,
}
}
fn len_usize(&self) -> usize {
let head = self.0.head.load_relaxed().into();
let tail = self.0.tail.load_relaxed().into();
U::truncate(tail.wrapping_sub(head)).into()
}
}
impl<T, N, U, C> Drop for Queue<T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
fn drop(&mut self) {
for item in self {
unsafe {
ptr::drop_in_place(item);
}
}
}
}
impl<T, N, U, C> fmt::Debug for Queue<T, N, U, C>
where
N: ArrayLength<T>,
T: fmt::Debug,
U: sealed::Uxx,
C: sealed::XCore,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
impl<T, N, U, C> hash::Hash for Queue<T, N, U, C>
where
N: ArrayLength<T>,
T: hash::Hash,
U: sealed::Uxx,
C: sealed::XCore,
{
fn hash<H: hash::Hasher>(&self, state: &mut H) {
for t in self.iter() {
hash::Hash::hash(t, state);
}
}
}
impl<T, N, U, C> hash32::Hash for Queue<T, N, U, C>
where
N: ArrayLength<T>,
T: hash32::Hash,
U: sealed::Uxx,
C: sealed::XCore,
{
fn hash<H: hash32::Hasher>(&self, state: &mut H) {
for t in self.iter() {
hash32::Hash::hash(t, state);
}
}
}
impl<'a, T, N, U, C> IntoIterator for &'a Queue<T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
type Item = &'a T;
type IntoIter = Iter<'a, T, N, U, C>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'a, T, N, U, C> IntoIterator for &'a mut Queue<T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
type Item = &'a mut T;
type IntoIter = IterMut<'a, T, N, U, C>;
fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}
macro_rules! impl_ {
($uxx:ident, $uxx_sc:ident) => {
impl<T, N> Queue<T, N, $uxx, MultiCore>
where
N: ArrayLength<T>,
{
pub fn $uxx() -> Self {
Queue(crate::i::Queue::$uxx())
}
}
impl<A> crate::i::Queue<A, $uxx, MultiCore> {
pub const fn $uxx() -> Self {
crate::i::Queue {
buffer: MaybeUninit::uninit(),
head: Atomic::new(0),
tail: Atomic::new(0),
}
}
}
impl<T, N> Queue<T, N, $uxx, SingleCore>
where
N: ArrayLength<T>,
{
pub unsafe fn $uxx_sc() -> Self {
Queue(crate::i::Queue::$uxx_sc())
}
}
impl<A> crate::i::Queue<A, $uxx, SingleCore> {
pub const unsafe fn $uxx_sc() -> Self {
crate::i::Queue {
buffer: MaybeUninit::uninit(),
head: Atomic::new(0),
tail: Atomic::new(0),
}
}
}
impl<T, N, C> Queue<T, N, $uxx, C>
where
N: ArrayLength<T>,
C: sealed::XCore,
{
pub fn peek(&self) -> Option<&T> {
let cap = self.capacity();
let head = self.0.head.get();
let tail = self.0.tail.get();
let p = self.0.buffer.as_ptr();
if *head != *tail {
let item = unsafe { &*(p as *const T).add(usize::from(*head % cap)) };
Some(item)
} else {
None
}
}
pub fn dequeue(&mut self) -> Option<T> {
let cap = self.capacity();
let head = self.0.head.get_mut();
let tail = self.0.tail.get_mut();
let p = self.0.buffer.as_ptr();
if *head != *tail {
let item = unsafe { (p as *const T).add(usize::from(*head % cap)).read() };
*head = head.wrapping_add(1);
Some(item)
} else {
None
}
}
pub fn enqueue(&mut self, item: T) -> Result<(), T> {
let cap = self.capacity();
let head = *self.0.head.get_mut();
let tail = *self.0.tail.get_mut();
if tail.wrapping_sub(head) > cap - 1 {
Err(item)
} else {
unsafe { self.enqueue_unchecked(item) }
Ok(())
}
}
pub unsafe fn enqueue_unchecked(&mut self, item: T) {
let cap = self.capacity();
let tail = self.0.tail.get_mut();
(self.0.buffer.as_mut_ptr() as *mut T)
.add(usize::from(*tail % cap))
.write(item);
*tail = tail.wrapping_add(1);
}
pub fn len(&self) -> $uxx {
let head = self.0.head.load_relaxed();
let tail = self.0.tail.load_relaxed();
tail.wrapping_sub(head)
}
}
impl<T, N, C> Clone for Queue<T, N, $uxx, C>
where
T: Clone,
N: ArrayLength<T>,
C: sealed::XCore,
{
fn clone(&self) -> Self {
let mut new: Queue<T, N, $uxx, C> = Queue(crate::i::Queue {
buffer: MaybeUninit::uninit(),
head: Atomic::new(0),
tail: Atomic::new(0),
});
for s in self.iter() {
unsafe {
new.enqueue_unchecked(s.clone());
}
}
new
}
}
};
}
impl<A> crate::i::Queue<A, usize, MultiCore> {
pub const fn new() -> Self {
crate::i::Queue::usize()
}
}
impl<T, N> Queue<T, N, usize, MultiCore>
where
N: ArrayLength<T>,
{
pub fn new() -> Self {
Queue(crate::i::Queue::new())
}
}
impl<A> crate::i::Queue<A, usize, SingleCore> {
pub const unsafe fn new_sc() -> Self {
crate::i::Queue::usize_sc()
}
}
impl<T, N> Queue<T, N, usize, SingleCore>
where
N: ArrayLength<T>,
{
pub unsafe fn new_sc() -> Self {
Queue(crate::i::Queue::new_sc())
}
}
impl_!(u8, u8_sc);
impl_!(u16, u16_sc);
impl_!(usize, usize_sc);
impl<T, N, U, C, N2, U2, C2> PartialEq<Queue<T, N2, U2, C2>> for Queue<T, N, U, C>
where
T: PartialEq,
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
N2: ArrayLength<T>,
U2: sealed::Uxx,
C2: sealed::XCore,
{
fn eq(&self, other: &Queue<T, N2, U2, C2>) -> bool {
self.len_usize() == other.len_usize()
&& self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
}
}
impl<T, N, U, C> Eq for Queue<T, N, U, C>
where
T: Eq,
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
}
pub struct Iter<'a, T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
rb: &'a Queue<T, N, U, C>,
index: usize,
len: usize,
}
impl<'a, T, N, U, C> Clone for Iter<'a, T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
fn clone(&self) -> Self {
Self {
rb: self.rb,
index: self.index,
len: self.len,
}
}
}
pub struct IterMut<'a, T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
rb: &'a mut Queue<T, N, U, C>,
index: usize,
len: usize,
}
macro_rules! iterator {
(struct $name:ident -> $elem:ty, $ptr:ty, $asptr:ident, $mkref:ident) => {
impl<'a, T, N, U, C> Iterator for $name<'a, T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
type Item = $elem;
fn next(&mut self) -> Option<$elem> {
if self.index < self.len {
let head = self.rb.0.head.load_relaxed().into();
let cap = self.rb.capacity().into();
let ptr = self.rb.0.buffer.$asptr() as $ptr;
let i = (head + self.index) % cap;
self.index += 1;
Some(unsafe { $mkref!(*ptr.offset(i as isize)) })
} else {
None
}
}
}
impl<'a, T, N, U, C> DoubleEndedIterator for $name<'a, T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
fn next_back(&mut self) -> Option<$elem> {
if self.index < self.len {
let head = self.rb.0.head.load_relaxed().into();
let cap = self.rb.capacity().into();
let ptr = self.rb.0.buffer.$asptr() as $ptr;
let i = (head + self.len - 1) % cap;
self.len -= 1;
Some(unsafe { $mkref!(*ptr.offset(i as isize)) })
} else {
None
}
}
}
};
}
macro_rules! make_ref {
($e:expr) => {
&($e)
};
}
macro_rules! make_ref_mut {
($e:expr) => {
&mut ($e)
};
}
iterator!(struct Iter -> &'a T, *const T, as_ptr, make_ref);
iterator!(struct IterMut -> &'a mut T, *mut T, as_mut_ptr, make_ref_mut);
#[cfg(test)]
mod tests {
use hash32::Hasher;
use crate::{consts::*, spsc::Queue};
#[test]
fn static_new() {
static mut _Q: Queue<i32, U4> = Queue(crate::i::Queue::new());
}
#[test]
fn drop() {
struct Droppable;
impl Droppable {
fn new() -> Self {
unsafe {
COUNT += 1;
}
Droppable
}
}
impl Drop for Droppable {
fn drop(&mut self) {
unsafe {
COUNT -= 1;
}
}
}
static mut COUNT: i32 = 0;
{
let mut v: Queue<Droppable, U4> = Queue::new();
v.enqueue(Droppable::new()).ok().unwrap();
v.enqueue(Droppable::new()).ok().unwrap();
v.dequeue().unwrap();
}
assert_eq!(unsafe { COUNT }, 0);
{
let mut v: Queue<Droppable, U4> = Queue::new();
v.enqueue(Droppable::new()).ok().unwrap();
v.enqueue(Droppable::new()).ok().unwrap();
}
assert_eq!(unsafe { COUNT }, 0);
}
#[test]
fn full() {
let mut rb: Queue<i32, U4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
rb.enqueue(3).unwrap();
assert!(rb.enqueue(4).is_err());
}
#[test]
fn iter() {
let mut rb: Queue<i32, U4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter();
assert_eq!(items.next(), Some(&0));
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), Some(&2));
assert_eq!(items.next(), None);
}
#[test]
fn iter_double_ended() {
let mut rb: Queue<i32, U4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter();
assert_eq!(items.next(), Some(&0));
assert_eq!(items.next_back(), Some(&2));
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}
#[test]
fn iter_overflow() {
let mut rb: Queue<i32, U4, u8> = Queue::u8();
rb.enqueue(0).unwrap();
for _ in 0..300 {
let mut items = rb.iter_mut();
assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next(), None);
rb.dequeue().unwrap();
rb.enqueue(0).unwrap();
}
}
#[test]
fn iter_mut() {
let mut rb: Queue<i32, U4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter_mut();
assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next(), Some(&mut 1));
assert_eq!(items.next(), Some(&mut 2));
assert_eq!(items.next(), None);
}
#[test]
fn iter_mut_double_ended() {
let mut rb: Queue<i32, U4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter_mut();
assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next_back(), Some(&mut 2));
assert_eq!(items.next(), Some(&mut 1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}
#[test]
fn sanity() {
let mut rb: Queue<i32, U4> = Queue::new();
assert_eq!(rb.dequeue(), None);
rb.enqueue(0).unwrap();
assert_eq!(rb.dequeue(), Some(0));
assert_eq!(rb.dequeue(), None);
}
#[test]
#[cfg(feature = "smaller-atomics")]
fn u8() {
let mut rb: Queue<u8, U256, _> = Queue::u8();
for _ in 0..255 {
rb.enqueue(0).unwrap();
}
assert!(rb.enqueue(0).is_err());
}
#[test]
fn wrap_around() {
let mut rb: Queue<i32, U3> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
rb.dequeue().unwrap();
rb.dequeue().unwrap();
rb.dequeue().unwrap();
rb.enqueue(3).unwrap();
rb.enqueue(4).unwrap();
assert_eq!(rb.len(), 2);
}
#[test]
fn ready_flag() {
let mut rb: Queue<i32, U2> = Queue::new();
let (mut p, mut c) = rb.split();
assert_eq!(c.ready(), false);
assert_eq!(p.ready(), true);
p.enqueue(0).unwrap();
assert_eq!(c.ready(), true);
assert_eq!(p.ready(), true);
p.enqueue(1).unwrap();
assert_eq!(c.ready(), true);
assert_eq!(p.ready(), false);
c.dequeue().unwrap();
assert_eq!(c.ready(), true);
assert_eq!(p.ready(), true);
c.dequeue().unwrap();
assert_eq!(c.ready(), false);
assert_eq!(p.ready(), true);
}
#[test]
fn clone() {
let mut rb1: Queue<i32, U4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
let rb2 = rb1.clone();
assert_eq!(rb1.capacity(), rb2.capacity());
assert_eq!(rb1.len_usize(), rb2.len_usize());
assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
}
#[test]
fn eq() {
let mut rb1: Queue<i32, U4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
let mut rb2: Queue<i32, U4> = Queue::new();
rb2.enqueue(0).unwrap();
rb2.enqueue(0).unwrap();
assert!(rb1 == rb2);
assert!(rb2 == rb1);
rb1.enqueue(0).unwrap();
assert!(rb1 != rb2);
rb2.enqueue(1).unwrap();
assert!(rb1 != rb2);
assert!(rb1 == rb1);
assert!(rb2 == rb2);
}
#[test]
fn hash_equality() {
let rb1 = {
let mut rb1: Queue<i32, U4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
rb1
};
let rb2 = {
let mut rb2: Queue<i32, U4> = Queue::new();
rb2.enqueue(0).unwrap();
rb2.enqueue(0).unwrap();
rb2
};
let hash1 = {
let mut hasher1 = hash32::FnvHasher::default();
hash32::Hash::hash(&rb1, &mut hasher1);
let hash1 = hasher1.finish();
hash1
};
let hash2 = {
let mut hasher2 = hash32::FnvHasher::default();
hash32::Hash::hash(&rb2, &mut hasher2);
let hash2 = hasher2.finish();
hash2
};
assert_eq!(hash1, hash2);
}
}