Files
addr2line
adler
aho_corasick
ansi_term
arraydeque
as_slice
atty
backtrace
base64
bincode_core
bitflags
byteorder
bytes
capnp
capnp_futures
capnp_rpc
cfg_if
chrono
clap
ctrlc
derivative
dlib
downcast_rs
enumflags2
enumflags2_derive
evdev_rs
evdev_sys
failure
failure_derive
flexi_logger
futures
futures_channel
futures_core
futures_executor
futures_io
futures_macro
futures_sink
futures_task
futures_util
async_await
future
io
lock
sink
stream
task
generic_array
getrandom
gimli
glob
hash32
heapless
hid_io_core
hid_io_protocol
hidapi
install_service
lazy_static
libc
libloading
libudev_sys
log
memchr
memmap
miniz_oxide
mio
nanoid
nix
num_cpus
num_enum
num_enum_derive
num_integer
num_traits
object
once_cell
open
pem
pin_project_lite
pin_utils
ppv_lite86
proc_macro2
proc_macro_hack
proc_macro_nested
quote
rand
rand_chacha
rand_core
rcgen
regex
regex_syntax
remove_dir_all
ring
rustc_demangle
rustls
scoped_tls
sct
serde
serde_derive
slab
smallvec
spin
stable_deref_trait
strsim
syn
synstructure
sys_info
tempfile
textwrap
thiserror
thiserror_impl
time
tokio
future
io
loom
macros
net
park
runtime
stream
sync
task
time
util
tokio_macros
tokio_rustls
tokio_util
typenum
udev
uhid_virt
uhidrs_sys
unicode_width
unicode_xid
untrusted
vec_map
wayland_client
wayland_commons
wayland_sys
webpki
which
x11
xcb
xkbcommon
yansi
yasna
zwp_virtual_keyboard
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use super::assert_sink;
use crate::unfold_state::UnfoldState;
use core::{future::Future, pin::Pin};
use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project_lite::pin_project;

pin_project! {
    /// Sink for the [`unfold`] function.
    #[derive(Debug)]
    #[must_use = "sinks do nothing unless polled"]
    pub struct Unfold<T, F, R> {
        function: F,
        #[pin]
        state: UnfoldState<T, R>,
    }
}

/// Create a sink from a function which processes one item at a time.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::sink::{self, SinkExt};
///
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
///     async move {
///         sum += i;
///         eprintln!("{}", i);
///         Ok::<_, futures::never::Never>(sum)
///     }
/// });
/// futures::pin_mut!(unfold);
/// unfold.send(5).await?;
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
/// ```
pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
where
    F: FnMut(T, Item) -> R,
    R: Future<Output = Result<T, E>>,
{
    assert_sink::<Item, E, _>(Unfold {
        function,
        state: UnfoldState::Value { value: init },
    })
}

impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
where
    F: FnMut(T, Item) -> R,
    R: Future<Output = Result<T, E>>,
{
    type Error = E;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.poll_flush(cx)
    }

    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
        let mut this = self.project();
        let future = match this.state.as_mut().take_value() {
            Some(value) => (this.function)(value, item),
            None => panic!("start_send called without poll_ready being called first"),
        };
        this.state.set(UnfoldState::Future { future });
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let mut this = self.project();
        Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
            match ready!(future.poll(cx)) {
                Ok(state) => {
                    this.state.set(UnfoldState::Value { value: state });
                    Ok(())
                }
                Err(err) => Err(err),
            }
        } else {
            Ok(())
        })
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.poll_flush(cx)
    }
}