Files
aho_corasick
ansi_term
arrayvec
atty
backtrace
backtrace_sys
base64
bincode
bitflags
byteorder
bytes
c2_chacha
capnp
capnp_futures
capnp_rpc
cfg_if
chrono
clap
crossbeam_deque
crossbeam_epoch
crossbeam_queue
crossbeam_utils
ctrlc
daemon
failure
failure_derive
flexi_logger
fnv
futures
getrandom
glob
hid_io
api
device
module
protocol
hidapi
install_service
iovec
lazy_static
libc
lock_api
log
memchr
memoffset
mio
mio_uds
nanoid
net2
nix
nodrop
num_cpus
num_integer
num_traits
open
parking_lot
parking_lot_core
pem
ppv_lite86
proc_macro2
quote
rand
rand_chacha
rand_core
rand_hc
rand_isaac
rand_jitter
rand_os
rand_pcg
rand_xorshift
rcgen
regex
regex_syntax
remove_dir_all
ring
rustc_demangle
rustls
scoped_tls
scopeguard
sct
serde
slab
smallvec
spin
stream_cancel
strsim
syn
synstructure
tempfile
textwrap
thread_local
time
tokio
tokio_codec
tokio_core
tokio_current_thread
tokio_executor
tokio_fs
tokio_io
tokio_reactor
tokio_rustls
tokio_sync
tokio_tcp
tokio_threadpool
tokio_timer
tokio_udp
tokio_uds
unicode_width
unicode_xid
untrusted
vec_map
void
webpki
windows_service
x11
xcb
xkbcommon
yasna
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use futures::{future::Shared, sync::oneshot, Async, Future, IntoFuture, Poll, Stream};
use Trigger;

/// A stream combinator which takes elements from a stream until a future resolves.
///
/// This structure is produced by the [`StreamExt::take_until`] method.
#[derive(Clone, Debug)]
pub struct TakeUntil<S, F> {
    stream: S,
    until: F,
    free: bool,
}

/// This `Stream` extension trait provides a `take_until` method that terminates the stream once
/// the given future resolves.
pub trait StreamExt: Stream {
    /// Take elements from this stream until the given future resolves.
    ///
    /// This function will take elements from this stream until the given future resolves. Once it
    /// resolves, the stream will yield `None`, and produce no further elements.
    ///
    /// If the future produces an error, the stream will be allowed to continue indefinitely.
    ///
    /// ```
    /// # extern crate stream_cancel;
    /// extern crate tokio;
    /// extern crate futures;
    ///
    /// use stream_cancel::StreamExt;
    /// use tokio::prelude::*;
    ///
    /// let listener = tokio::net::TcpListener::bind(&"0.0.0.0:0".parse().unwrap()).unwrap();
    /// let (tx, rx) = futures::sync::oneshot::channel();
    ///
    /// let mut rt = tokio::runtime::Runtime::new().unwrap();
    /// rt.spawn(
    ///     listener
    ///         .incoming()
    ///         .take_until(rx.map_err(|_| ()))
    ///         .map_err(|e| eprintln!("accept failed = {:?}", e))
    ///         .for_each(|sock| {
    ///             let (reader, writer) = sock.split();
    ///             tokio::spawn(
    ///                 tokio::io::copy(reader, writer)
    ///                     .map(|amt| println!("wrote {:?} bytes", amt))
    ///                     .map_err(|err| eprintln!("IO error {:?}", err)),
    ///             )
    ///         }),
    /// );
    ///
    /// // tell the listener to stop accepting new connections
    /// tx.send(()).unwrap();
    /// rt.shutdown_on_idle().wait().unwrap();
    /// ```
    fn take_until<U>(self, until: U) -> TakeUntil<Self, U::Future>
    where
        U: IntoFuture<Item = (), Error = ()>,
        Self: Sized,
    {
        TakeUntil {
            stream: self,
            until: until.into_future(),
            free: false,
        }
    }
}

impl<S> StreamExt for S where S: Stream {}

impl<S, F> Stream for TakeUntil<S, F>
where
    S: Stream,
    F: Future<Item = (), Error = ()>,
{
    type Item = S::Item;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if !self.free {
            match self.until.poll() {
                Ok(Async::Ready(_)) => {
                    // future resolved -- terminate stream
                    return Ok(Async::Ready(None));
                }
                Err(_) => {
                    // future failed -- unclear whether we should stop or continue?
                    // to provide a mechanism for the creator to let the stream run forever,
                    // we interpret this as "run forever".
                    self.free = true;
                }
                Ok(Async::NotReady) => {}
            }
        }

        self.stream.poll()
    }
}

/// A `Tripwire` is a convenient mechanism for implementing graceful shutdown over many
/// asynchronous streams. A `Tripwire` is a `Future` that is `Clone`, and that can be passed to
/// [`StreamExt::take_until`]. All `Tripwire` clones are associated with a single [`Trigger`],
/// which is then used to signal that all the associated streams should be terminated.
///
/// `Tripwire` is internally implemented using a `Shared<oneshot::Receiver<()>>`, with the
/// `Trigger` holding the associated `oneshot::Sender`. There is very little magic.
#[derive(Clone, Debug)]
pub struct Tripwire(Shared<oneshot::Receiver<()>>);

impl Tripwire {
    /// Make a new `Tripwire` and an associated [`Trigger`].
    pub fn new() -> (Trigger, Self) {
        let (tx, rx) = oneshot::channel();
        (Trigger(Some(tx)), Tripwire(rx.shared()))
    }
}

impl Future for Tripwire {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        match self.0.poll() {
            Ok(Async::Ready(_)) => Ok(Async::Ready(())),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(_) => Err(()),
        }
    }
}