429 lines
9.4 KiB
Rust
Raw Normal View History

use std::time::{Instant, Duration};
use flume::*;
#[test]
fn send_recv() {
let (tx, rx) = unbounded();
for i in 0..1000 { tx.send(i).unwrap(); }
for i in 0..1000 { assert_eq!(rx.try_recv().unwrap(), i); }
assert!(rx.try_recv().is_err());
}
#[test]
fn iter() {
let (tx, rx) = unbounded();
for i in 0..1000 { tx.send(i).unwrap(); }
drop(tx);
assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
}
#[test]
fn try_iter() {
let (tx, rx) = unbounded();
for i in 0..1000 { tx.send(i).unwrap(); }
assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum());
}
#[test]
fn iter_threaded() {
let (tx, rx) = unbounded();
for i in 0..1000 {
let tx = tx.clone();
std::thread::spawn(move || tx.send(i).unwrap());
}
drop(tx);
assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
}
#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
#[test]
fn send_timeout() {
let dur = Duration::from_millis(350);
let max_error = Duration::from_millis(5);
let dur_min = dur.checked_sub(max_error).unwrap();
let dur_max = dur.checked_add(max_error).unwrap();
let (tx, rx) = bounded(1);
assert!(tx.send_timeout(42, dur).is_ok());
let then = Instant::now();
assert!(tx.send_timeout(43, dur).is_err());
let now = Instant::now();
let this = now.duration_since(then);
if !(dur_min < this && this < dur_max) {
panic!("timeout exceeded: {:?}", this);
}
assert_eq!(rx.drain().count(), 1);
drop(rx);
assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err());
}
#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
#[test]
fn recv_timeout() {
let dur = Duration::from_millis(350);
let max_error = Duration::from_millis(5);
let dur_min = dur.checked_sub(max_error).unwrap();
let dur_max = dur.checked_add(max_error).unwrap();
let (tx, rx) = unbounded();
let then = Instant::now();
assert!(rx.recv_timeout(dur).is_err());
let now = Instant::now();
let this = now.duration_since(then);
if !(dur_min < this && this < dur_max) {
panic!("timeout exceeded: {:?}", this);
}
tx.send(42).unwrap();
assert_eq!(rx.recv_timeout(dur), Ok(42));
assert!(Instant::now().duration_since(now) < max_error);
}
#[cfg_attr(any(target_os = "macos", windows), ignore)] // FIXME #41
#[test]
fn recv_deadline() {
let dur = Duration::from_millis(350);
let max_error = Duration::from_millis(5);
let dur_min = dur.checked_sub(max_error).unwrap();
let dur_max = dur.checked_add(max_error).unwrap();
let (tx, rx) = unbounded();
let then = Instant::now();
assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err());
let now = Instant::now();
let this = now.duration_since(then);
if !(dur_min < this && this < dur_max) {
panic!("timeout exceeded: {:?}", this);
}
tx.send(42).unwrap();
assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42));
assert!(Instant::now().duration_since(now) < max_error);
}
#[test]
fn recv_timeout_missed_send() {
let (tx, rx) = bounded(10);
assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());
tx.send(42).unwrap();
assert_eq!(rx.recv(), Ok(42));
}
#[test]
fn disconnect_tx() {
let (tx, rx) = unbounded::<()>();
drop(tx);
assert!(rx.recv().is_err());
}
#[test]
fn disconnect_rx() {
let (tx, rx) = unbounded();
drop(rx);
assert!(tx.send(0).is_err());
}
#[test]
fn drain() {
let (tx, rx) = unbounded();
for i in 0..100 {
tx.send(i).unwrap();
}
assert_eq!(rx.drain().sum::<u32>(), (0..100).sum());
for i in 0..100 {
tx.send(i).unwrap();
}
for i in 0..100 {
tx.send(i).unwrap();
}
rx.recv().unwrap();
(1u32..100).chain(0..100).zip(rx).for_each(|(l, r)| assert_eq!(l, r));
}
#[test]
fn try_send() {
let (tx, rx) = bounded(5);
for i in 0..5 {
tx.try_send(i).unwrap();
}
assert!(tx.try_send(42).is_err());
assert_eq!(rx.recv(), Ok(0));
assert_eq!(tx.try_send(42), Ok(()));
assert_eq!(rx.recv(), Ok(1));
drop(rx);
assert!(tx.try_send(42).is_err());
}
#[test]
fn send_bounded() {
let (tx, rx) = bounded(5);
for _ in 0..5 {
tx.send(42).unwrap();
}
let _ = rx.recv().unwrap();
tx.send(42).unwrap();
assert!(tx.try_send(42).is_err());
rx.drain();
let mut ts = Vec::new();
for _ in 0..100 {
let tx = tx.clone();
ts.push(std::thread::spawn(move || {
for i in 0..10000 {
tx.send(i).unwrap();
}
}));
}
drop(tx);
assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100);
for t in ts {
t.join().unwrap();
}
assert!(rx.recv().is_err());
}
#[test]
fn rendezvous() {
let (tx, rx) = bounded(0);
for i in 0..5 {
let tx = tx.clone();
let t = std::thread::spawn(move || {
assert!(tx.try_send(()).is_err());
let then = Instant::now();
tx.send(()).unwrap();
let now = Instant::now();
assert!(now.duration_since(then) > Duration::from_millis(100), "iter = {}", i);
});
std::thread::sleep(Duration::from_millis(1000));
rx.recv().unwrap();
t.join().unwrap();
}
}
#[test]
fn hydra() {
let thread_num = 32;
let msg_num = 1000;
let (main_tx, main_rx) = unbounded::<()>();
let mut txs = Vec::new();
for _ in 0..thread_num {
let main_tx = main_tx.clone();
let (tx, rx) = unbounded();
txs.push(tx);
std::thread::spawn(move || {
for msg in rx.iter() {
main_tx.send(msg).unwrap();
}
});
}
drop(main_tx);
for _ in 0..10 {
for tx in &txs {
for _ in 0..msg_num {
tx.send(Default::default()).unwrap();
}
}
for _ in 0..thread_num {
for _ in 0..msg_num {
main_rx.recv().unwrap();
}
}
}
drop(txs);
assert!(main_rx.recv().is_err());
}
#[test]
fn robin() {
let thread_num = 32;
let msg_num = 10;
let (mut main_tx, main_rx) = bounded::<()>(1);
for _ in 0..thread_num {
let (mut tx, rx) = bounded(100);
std::mem::swap(&mut tx, &mut main_tx);
std::thread::spawn(move || {
for msg in rx.iter() {
tx.send(msg).unwrap();
}
});
}
for _ in 0..10 {
let main_tx = main_tx.clone();
std::thread::spawn(move || {
for _ in 0..msg_num {
main_tx.send(Default::default()).unwrap();
}
});
for _ in 0..msg_num {
main_rx.recv().unwrap();
}
}
}
#[cfg(feature = "select")]
#[test]
fn select_general() {
#[derive(Debug, PartialEq)]
struct Foo(usize);
let (tx0, rx0) = bounded(1);
let (tx1, rx1) = unbounded();
for (i, t) in vec![tx0.clone(), tx1].into_iter().enumerate() {
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(250));
let _ = t.send(Foo(i));
});
}
let x = Selector::new()
.recv(&rx0, |x| x)
.recv(&rx1, |x| x)
.wait()
.unwrap();
if x == Foo(0) {
assert!(rx1.recv().unwrap() == Foo(1));
} else {
assert!(rx0.recv().unwrap() == Foo(0));
}
tx0.send(Foo(42)).unwrap();
let t = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(rx0.recv().unwrap(), Foo(42));
assert_eq!(rx0.recv().unwrap(), Foo(43));
});
Selector::new()
.send(&tx0, Foo(43), |x| x)
.wait()
.unwrap();
t.join().unwrap();
}
struct MessageWithoutDebug(u32);
#[test]
// This is a 'does it build' test, to make sure that the error types can turn
// into a std::error::Error without requiring the payload (which is not used
// there) to impl Debug.
fn std_error_without_debug() {
let (tx, rx) = unbounded::<MessageWithoutDebug>();
match tx.send(MessageWithoutDebug(1)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match rx.recv() {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match tx.try_send(MessageWithoutDebug(2)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match rx.try_recv() {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match rx.recv_timeout(Duration::from_secs(10000000)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
}
#[test]
fn weak_close() {
let (tx, rx) = unbounded::<()>();
let weak = tx.downgrade();
drop(tx);
assert!(weak.upgrade().is_none());
assert!(rx.is_disconnected());
assert!(rx.try_recv().is_err());
}
#[test]
fn weak_upgrade() {
let (tx, rx) = unbounded();
let weak = tx.downgrade();
let tx2 = weak.upgrade().unwrap();
drop(tx);
assert!(!rx.is_disconnected());
tx2.send(()).unwrap();
assert!(rx.try_recv().is_ok());
}