1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use crate::loom::sync::atomic::AtomicPtr;
use crate::runtime::task::{Header, Task};

use std::marker::PhantomData;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};

/// Concurrent stack of tasks, used to pass ownership of a task from one worker
/// to another.
pub(crate) struct TransferStack<T: 'static> {
    head: AtomicPtr<Header>,
    _p: PhantomData<T>,
}

impl<T: 'static> TransferStack<T> {
    pub(crate) fn new() -> TransferStack<T> {
        TransferStack {
            head: AtomicPtr::new(ptr::null_mut()),
            _p: PhantomData,
        }
    }

    pub(crate) fn push(&self, task: Task<T>) {
        let task = task.into_raw();

        // We don't care about any memory associated w/ setting the `head`
        // field, just the current value.
        //
        // The compare-exchange creates a release sequence.
        let mut curr = self.head.load(Relaxed);

        loop {
            unsafe {
                task.as_ref()
                    .stack_next
                    .with_mut(|ptr| *ptr = NonNull::new(curr))
            };

            let res = self
                .head
                .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed);

            match res {
                Ok(_) => return,
                Err(actual) => {
                    curr = actual;
                }
            }
        }
    }

    pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> {
        struct Iter<T: 'static>(Option<NonNull<Header>>, PhantomData<T>);

        impl<T: 'static> Iterator for Iter<T> {
            type Item = Task<T>;

            fn next(&mut self) -> Option<Task<T>> {
                let task = self.0?;

                // Move the cursor forward
                self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) };

                // Return the task
                unsafe { Some(Task::from_raw(task)) }
            }
        }

        impl<T: 'static> Drop for Iter<T> {
            fn drop(&mut self) {
                use std::process;

                if self.0.is_some() {
                    // we have bugs
                    process::abort();
                }
            }
        }

        let ptr = self.head.swap(ptr::null_mut(), Acquire);
        Iter(NonNull::new(ptr), PhantomData)
    }
}