use core::{marker::PhantomData, ptr::NonNull};
use generic_array::ArrayLength;
use crate::{
sealed::spsc as sealed,
spsc::{MultiCore, Queue},
};
impl<T, N, U, C> Queue<T, N, U, C>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
pub fn split<'rb>(&'rb mut self) -> (Producer<'rb, T, N, U, C>, Consumer<'rb, T, N, U, C>) {
(
Producer {
rb: unsafe { NonNull::new_unchecked(self) },
_marker: PhantomData,
},
Consumer {
rb: unsafe { NonNull::new_unchecked(self) },
_marker: PhantomData,
},
)
}
}
pub struct Consumer<'a, T, N, U = usize, C = MultiCore>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
rb: NonNull<Queue<T, N, U, C>>,
_marker: PhantomData<&'a ()>,
}
unsafe impl<'a, T, N, U, C> Send for Consumer<'a, T, N, U, C>
where
N: ArrayLength<T>,
T: Send,
U: sealed::Uxx,
C: sealed::XCore,
{
}
pub struct Producer<'a, T, N, U = usize, C = MultiCore>
where
N: ArrayLength<T>,
U: sealed::Uxx,
C: sealed::XCore,
{
rb: NonNull<Queue<T, N, U, C>>,
_marker: PhantomData<&'a ()>,
}
unsafe impl<'a, T, N, U, C> Send for Producer<'a, T, N, U, C>
where
N: ArrayLength<T>,
T: Send,
U: sealed::Uxx,
C: sealed::XCore,
{
}
macro_rules! impl_ {
($uxx:ident) => {
impl<'a, T, N, C> Consumer<'a, T, N, $uxx, C>
where
N: ArrayLength<T>,
C: sealed::XCore,
{
pub fn ready(&self) -> bool {
let head = unsafe { self.rb.as_ref().0.head.load_relaxed() };
let tail = unsafe { self.rb.as_ref().0.tail.load_acquire() };
return head != tail;
}
pub fn peek(&self) -> Option<&T> {
let head = unsafe { self.rb.as_ref().0.head.load_relaxed() };
let tail = unsafe { self.rb.as_ref().0.tail.load_acquire() };
if head != tail {
Some(unsafe { self._peek(head) })
} else {
None
}
}
pub fn dequeue(&mut self) -> Option<T> {
let head = unsafe { self.rb.as_ref().0.head.load_relaxed() };
let tail = unsafe { self.rb.as_ref().0.tail.load_acquire() };
if head != tail {
Some(unsafe { self._dequeue(head) })
} else {
None
}
}
pub unsafe fn dequeue_unchecked(&mut self) -> T {
let head = self.rb.as_ref().0.head.load_relaxed();
debug_assert_ne!(head, self.rb.as_ref().0.tail.load_acquire());
self._dequeue(head)
}
pub fn capacity(&self) -> $uxx {
unsafe { self.rb.as_ref().capacity() }
}
pub fn len(&self) -> $uxx {
let head = unsafe { self.rb.as_ref().0.head.load_relaxed() };
let tail = unsafe { self.rb.as_ref().0.tail.load_acquire() };
tail.wrapping_sub(head)
}
unsafe fn _peek(&self, head: $uxx) -> &T {
let rb = self.rb.as_ref();
let cap = rb.capacity();
let item = (rb.0.buffer.as_ptr() as *const T).add(usize::from(head % cap));
&*item
}
unsafe fn _dequeue(&mut self, head: $uxx) -> T {
let rb = self.rb.as_ref();
let cap = rb.capacity();
let item = (rb.0.buffer.as_ptr() as *const T)
.add(usize::from(head % cap))
.read();
rb.0.head.store_release(head.wrapping_add(1));
item
}
}
impl<'a, T, N, C> Producer<'a, T, N, $uxx, C>
where
N: ArrayLength<T>,
C: sealed::XCore,
{
pub fn ready(&self) -> bool {
let cap = unsafe { self.rb.as_ref().capacity() };
let tail = unsafe { self.rb.as_ref().0.tail.load_relaxed() };
let head = unsafe { self.rb.as_ref().0.head.load_acquire() };
return head.wrapping_add(cap) != tail;
}
pub fn enqueue(&mut self, item: T) -> Result<(), T> {
let cap = unsafe { self.rb.as_ref().capacity() };
let tail = unsafe { self.rb.as_ref().0.tail.load_relaxed() };
let head = unsafe { self.rb.as_ref().0.head.load_acquire() };
if tail.wrapping_sub(head) > cap - 1 {
Err(item)
} else {
unsafe { self._enqueue(tail, item) };
Ok(())
}
}
pub fn capacity(&self) -> $uxx {
unsafe { self.rb.as_ref().capacity() }
}
pub fn len(&self) -> $uxx {
let head = unsafe { self.rb.as_ref().0.head.load_acquire() };
let tail = unsafe { self.rb.as_ref().0.tail.load_relaxed() };
tail.wrapping_sub(head)
}
pub unsafe fn enqueue_unchecked(&mut self, item: T) {
let tail = self.rb.as_ref().0.tail.load_relaxed();
debug_assert_ne!(tail.wrapping_add(1), self.rb.as_ref().0.head.load_acquire());
self._enqueue(tail, item);
}
unsafe fn _enqueue(&mut self, tail: $uxx, item: T) {
let rb = self.rb.as_mut();
let cap = rb.capacity();
(rb.0.buffer.as_mut_ptr() as *mut T)
.add(usize::from(tail % cap))
.write(item);
rb.0.tail.store_release(tail.wrapping_add(1));
}
}
};
}
impl_!(u8);
impl_!(u16);
impl_!(usize);
#[cfg(test)]
mod tests {
use crate::{consts::*, spsc::Queue};
#[test]
fn sanity() {
let mut rb: Queue<i32, U2> = Queue::new();
let (mut p, mut c) = rb.split();
assert_eq!(c.dequeue(), None);
p.enqueue(0).unwrap();
assert_eq!(c.dequeue(), Some(0));
}
}