1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::any::Any;
use std::error::Error;
use std::fmt;
use {StartSend, Sink, Stream, Poll, Async, AsyncSink};
use sync::BiLock;
#[derive(Debug)]
pub struct SplitStream<S>(BiLock<S>);
impl<S> SplitStream<S> {
pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> {
other.reunite(self)
}
}
impl<S: Stream> Stream for SplitStream<S> {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
match self.0.poll_lock() {
Async::Ready(mut inner) => inner.poll(),
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[derive(Debug)]
pub struct SplitSink<S>(BiLock<S>);
impl<S> SplitSink<S> {
pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S>> {
self.0.reunite(other.0).map_err(|err| {
ReuniteError(SplitSink(err.0), SplitStream(err.1))
})
}
}
impl<S: Sink> Sink for SplitSink<S> {
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;
fn start_send(&mut self, item: S::SinkItem)
-> StartSend<S::SinkItem, S::SinkError>
{
match self.0.poll_lock() {
Async::Ready(mut inner) => inner.start_send(item),
Async::NotReady => Ok(AsyncSink::NotReady(item)),
}
}
fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
match self.0.poll_lock() {
Async::Ready(mut inner) => inner.poll_complete(),
Async::NotReady => Ok(Async::NotReady),
}
}
fn close(&mut self) -> Poll<(), S::SinkError> {
match self.0.poll_lock() {
Async::Ready(mut inner) => inner.close(),
Async::NotReady => Ok(Async::NotReady),
}
}
}
pub fn split<S: Stream + Sink>(s: S) -> (SplitSink<S>, SplitStream<S>) {
let (a, b) = BiLock::new(s);
let read = SplitStream(a);
let write = SplitSink(b);
(write, read)
}
pub struct ReuniteError<T>(pub SplitSink<T>, pub SplitStream<T>);
impl<T> fmt::Debug for ReuniteError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("ReuniteError")
.field(&"...")
.finish()
}
}
impl<T> fmt::Display for ReuniteError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair")
}
}
impl<T: Any> Error for ReuniteError<T> {
fn description(&self) -> &str {
"tried to reunite a SplitStream and SplitSink that don't form a pair"
}
}