1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use futures::future::Future;
use futures::channel::oneshot;
use futures::{AsyncWrite, AsyncWriteExt, StreamExt, TryFutureExt};
use capnp::{Error};
use crate::serialize::{AsOutputSegments};
enum Item<M> where M: AsOutputSegments {
Message(M, oneshot::Sender<M>),
Done(Result<(), Error>, oneshot::Sender<()>)
}
pub struct Sender<M> where M: AsOutputSegments {
sender: futures::channel::mpsc::UnboundedSender<Item<M>>,
}
impl <M> Clone for Sender<M> where M: AsOutputSegments {
fn clone(&self) -> Sender<M> {
Sender { sender: self.sender.clone() }
}
}
pub fn write_queue<W, M>(mut writer: W) -> (Sender<M>, impl Future<Output=Result<(),Error>> )
where W: AsyncWrite + Unpin , M: AsOutputSegments
{
let (tx, mut rx) = futures::channel::mpsc::unbounded();
let sender = Sender { sender: tx };
let queue = async move {
while let Some(item) = rx.next().await {
match item {
Item::Message(m, returner) => {
crate::serialize::write_message(&mut writer, &m).await?;
writer.flush().await?;
let _ = returner.send(m);
}
Item::Done(r, finisher) => {
let _ = finisher.send(());
return r;
}
}
}
Ok(())
};
(sender, queue)
}
impl <M> Sender<M> where M: AsOutputSegments {
pub fn send(&mut self, message: M) -> impl Future<Output=Result<M,Error>> + Unpin {
let (complete, oneshot) = oneshot::channel();
let _ = self.sender.unbounded_send(Item::Message(message, complete));
oneshot.map_err(
|oneshot::Canceled| Error::disconnected("WriteQueue has terminated".into()))
}
pub fn len(&mut self) -> usize {
unimplemented!()
}
pub fn terminate(&mut self, result: Result<(), Error>) -> impl Future<Output=Result<(),Error>> + Unpin {
let (complete, receiver) = oneshot::channel();
let _ = self.sender.unbounded_send(Item::Done(result, complete));
receiver.map_err(
|oneshot::Canceled| Error::disconnected("WriteQueue has terminated".into()))
}
}