1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use crate::{inner::StreamProcessor, ProcessingError};
use futures::unsync::mpsc;
pub mod bounded {
use super::*;
implement!(
"A clonable single-threaded sink-like queue.",
mpsc::Sender<Item>,
mpsc::Receiver<Item>,
(queue_len: usize),
mpsc::channel(queue_len),
mpsc::SendError<Item>,
(),
);
}
pub mod unbounded {
use super::*;
implement!(
"A clonable single-threaded sink-like queue.",
mpsc::UnboundedSender<Item>,
mpsc::UnboundedReceiver<Item>,
(),
mpsc::unbounded(),
mpsc::SendError<Item>,
(),
);
}
#[cfg(test)]
mod tests {
use super::bounded::*;
use crate::test::try_once;
use futures::{stream::iter_ok, Future, Sink};
use std::sync::mpsc::channel;
use tokio::runtime::current_thread::Runtime;
#[test]
fn singlethread() {
let (events_sender, events_receiver) = channel();
let processor = move |item: u8| events_sender.send(item);
const CAPACITY: usize = 3;
let (mut queue, driver) = LazyQueue::new(processor, CAPACITY);
let mut rt = Runtime::new().unwrap();
rt.spawn(driver.map_err(|e| panic!("Error while processing a queue: {:?}", e)));
let items = vec![0, 2, 1];
assert_eq!(CAPACITY, items.len());
queue = rt
.block_on(
queue
.send_all(iter_ok(items.clone()))
.map(|(queue, _)| queue),
)
.unwrap();
let maybe_queue = rt
.block_on(try_once(queue.send_all(iter_ok(vec![9, 10]))))
.unwrap();
assert!(maybe_queue.is_none());
rt.run().unwrap();
assert_eq!(items, events_receiver.iter().take(3).collect::<Vec<_>>());
}
}