1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use crate::{inner::StreamProcessor, ProcessingError};
use tokio_sync::mpsc;
pub mod bounded {
use super::*;
implement!(
"A clonable thread safe sink-like queue.",
mpsc::Sender<Item>,
mpsc::Receiver<Item>,
(queue_len: usize),
mpsc::channel(queue_len),
mpsc::error::SendError,
mpsc::error::RecvError,
);
}
pub mod unbounded {
use super::*;
implement!(
"A clonable thread safe sink-like queue.",
mpsc::UnboundedSender<Item>,
mpsc::UnboundedReceiver<Item>,
(),
mpsc::unbounded_channel(),
mpsc::error::UnboundedSendError,
mpsc::error::UnboundedRecvError,
);
}
#[cfg(test)]
mod tests {
use super::bounded::*;
use crate::test::try_once;
use futures::{stream::iter_ok, Future, Sink, Stream};
use std::sync::mpsc::channel;
use tokio::runtime::Runtime;
use tokio_sync::watch;
#[test]
fn multithread() {
let (mut unpause, rx) = watch::channel(false);
let (events_sender, events_receiver) = channel();
let rx_clone = rx.clone();
let processor = move |item: u8| {
let events_sender = events_sender.clone();
rx_clone
.clone()
.filter(|&val| val)
.into_future()
.then(move |_| events_sender.send(item))
};
const CAPACITY: usize = 3;
let (mut queue, driver) = LazyQueue::new(processor, CAPACITY);
let mut rt = Runtime::new().unwrap();
rt.spawn(driver.map_err(|e| panic!("Error while processing a queue: {}", e)));
let items = vec![0, 2, 1];
assert_eq!(CAPACITY, items.len());
queue = rt
.block_on(
queue
.send_all(iter_ok(items.clone()))
.map(|(queue, _)| queue),
)
.unwrap();
let maybe_queue = rt
.block_on(try_once(queue.send_all(iter_ok(vec![9, 10]))))
.unwrap();
assert!(maybe_queue.is_none());
unpause.broadcast(true).unwrap();
rt.shutdown_on_idle().wait().unwrap();
assert_eq!(items, events_receiver.iter().take(3).collect::<Vec<_>>());
}
}