256 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
		
		
			
		
	
	
			256 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
|  | #[cfg(feature = "async")]
 | ||
|  | use {
 | ||
|  |     flume::*,
 | ||
|  |     futures::{stream::FuturesUnordered, StreamExt, TryFutureExt},
 | ||
|  |     async_std::prelude::FutureExt,
 | ||
|  |     std::time::Duration,
 | ||
|  | };
 | ||
|  | use futures::{stream, Stream};
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[test]
 | ||
|  | fn stream_recv() {
 | ||
|  |     let (tx, rx) = unbounded();
 | ||
|  | 
 | ||
|  |     let t = std::thread::spawn(move || {
 | ||
|  |         std::thread::sleep(std::time::Duration::from_millis(250));
 | ||
|  |         tx.send(42u32).unwrap();
 | ||
|  |         println!("sent");
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     async_std::task::block_on(async {
 | ||
|  |         println!("receiving...");
 | ||
|  |         let x = rx.stream().next().await;
 | ||
|  |         println!("received");
 | ||
|  |         assert_eq!(x, Some(42));
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     t.join().unwrap();
 | ||
|  | }
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[test]
 | ||
|  | fn stream_recv_disconnect() {
 | ||
|  |     let (tx, rx) = bounded::<i32>(0);
 | ||
|  | 
 | ||
|  |     let t = std::thread::spawn(move || {
 | ||
|  |         tx.send(42);
 | ||
|  |         std::thread::sleep(std::time::Duration::from_millis(250));
 | ||
|  |         drop(tx)
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     async_std::task::block_on(async {
 | ||
|  |         let mut stream = rx.into_stream();
 | ||
|  |         assert_eq!(stream.next().await, Some(42));
 | ||
|  |         assert_eq!(stream.next().await, None);
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     t.join().unwrap();
 | ||
|  | }
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[test]
 | ||
|  | fn stream_recv_drop_recv() {
 | ||
|  |     let (tx, rx) = bounded::<i32>(10);
 | ||
|  | 
 | ||
|  |     let rx2 = rx.clone();
 | ||
|  |     let mut stream = rx.into_stream();
 | ||
|  | 
 | ||
|  |     async_std::task::block_on(async {
 | ||
|  |         let res = async_std::future::timeout(
 | ||
|  |             std::time::Duration::from_millis(500),
 | ||
|  |             stream.next()
 | ||
|  |         ).await;
 | ||
|  | 
 | ||
|  |         assert!(res.is_err());
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     let t = std::thread::spawn(move || {
 | ||
|  |         async_std::task::block_on(async {
 | ||
|  |             rx2.stream().next().await
 | ||
|  |         })
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     std::thread::sleep(std::time::Duration::from_millis(500));
 | ||
|  | 
 | ||
|  |     tx.send(42).unwrap();
 | ||
|  | 
 | ||
|  |     drop(stream);
 | ||
|  | 
 | ||
|  |     assert_eq!(t.join().unwrap(), Some(42))
 | ||
|  | }
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[test]
 | ||
|  | fn r#stream_drop_send_disconnect() {
 | ||
|  |     let (tx, rx) = bounded::<i32>(1);
 | ||
|  | 
 | ||
|  |     let t = std::thread::spawn(move || {
 | ||
|  |         std::thread::sleep(std::time::Duration::from_millis(250));
 | ||
|  |         drop(tx);
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     async_std::task::block_on(async {
 | ||
|  |         let mut stream = rx.into_stream();
 | ||
|  |         assert_eq!(stream.next().await, None);
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     t.join().unwrap();
 | ||
|  | }
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[async_std::test]
 | ||
|  | async fn stream_send_1_million_no_drop_or_reorder() {
 | ||
|  |     #[derive(Debug)]
 | ||
|  |     enum Message {
 | ||
|  |         Increment {
 | ||
|  |             old: u64,
 | ||
|  |         },
 | ||
|  |         ReturnCount,
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     let (tx, rx) = unbounded();
 | ||
|  | 
 | ||
|  |     let t = async_std::task::spawn(async move {
 | ||
|  |         let mut count = 0u64;
 | ||
|  |         let mut stream = rx.into_stream();
 | ||
|  | 
 | ||
|  |         while let Some(Message::Increment { old }) = stream.next().await {
 | ||
|  |             assert_eq!(old, count);
 | ||
|  |             count += 1;
 | ||
|  |         }
 | ||
|  | 
 | ||
|  |         count
 | ||
|  |     });
 | ||
|  | 
 | ||
|  |     for next in 0..1_000_000 {
 | ||
|  |         tx.send(Message::Increment { old: next }).unwrap();
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     tx.send(Message::ReturnCount).unwrap();
 | ||
|  | 
 | ||
|  |     let count = t.await;
 | ||
|  |     assert_eq!(count, 1_000_000)
 | ||
|  | }
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[async_std::test]
 | ||
|  | async fn parallel_streams_and_async_recv() {
 | ||
|  |     let (tx, rx) = flume::unbounded();
 | ||
|  |     let rx = ℞
 | ||
|  |     let send_fut = async move {
 | ||
|  |         let n_sends: usize = 100000;
 | ||
|  |         for _ in 0..n_sends {
 | ||
|  |             tx.send_async(()).await.unwrap();
 | ||
|  |         }
 | ||
|  |     };
 | ||
|  | 
 | ||
|  |     async_std::task::spawn(
 | ||
|  |         send_fut
 | ||
|  |             .timeout(Duration::from_secs(5))
 | ||
|  |             .map_err(|_| panic!("Send timed out!"))
 | ||
|  |     );
 | ||
|  | 
 | ||
|  |     let mut futures_unordered = (0..250)
 | ||
|  |         .map(|n| async move {
 | ||
|  |             if n % 2 == 0 {
 | ||
|  |                 let mut stream = rx.stream();
 | ||
|  |                 while let Some(()) = stream.next().await {}
 | ||
|  |             } else {
 | ||
|  |                 while let Ok(()) = rx.recv_async().await {}
 | ||
|  |             }
 | ||
|  | 
 | ||
|  |         })
 | ||
|  |         .collect::<FuturesUnordered<_>>();
 | ||
|  | 
 | ||
|  |     let recv_fut = async {
 | ||
|  |         while futures_unordered.next().await.is_some() {}
 | ||
|  |     };
 | ||
|  | 
 | ||
|  |     recv_fut
 | ||
|  |         .timeout(Duration::from_secs(5))
 | ||
|  |         .map_err(|_| panic!("Receive timed out!"))
 | ||
|  |         .await
 | ||
|  |         .unwrap();
 | ||
|  | }
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[test]
 | ||
|  | fn stream_no_double_wake() {
 | ||
|  |     use std::sync::atomic::{AtomicUsize, Ordering};
 | ||
|  |     use std::sync::Arc;
 | ||
|  |     use std::pin::Pin;
 | ||
|  |     use std::task::Context;
 | ||
|  |     use futures::task::{waker, ArcWake};
 | ||
|  |     use futures::Stream;
 | ||
|  | 
 | ||
|  |     let count = Arc::new(AtomicUsize::new(0));
 | ||
|  | 
 | ||
|  |     // all this waker does is count how many times it is called
 | ||
|  |     struct CounterWaker {
 | ||
|  |         count: Arc<AtomicUsize>,
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     impl ArcWake for CounterWaker {
 | ||
|  |         fn wake_by_ref(arc_self: &Arc<Self>) {
 | ||
|  |             arc_self.count.fetch_add(1, Ordering::SeqCst);
 | ||
|  |         }
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     // create waker and context
 | ||
|  |     let w = CounterWaker {
 | ||
|  |         count: count.clone(),
 | ||
|  |     };
 | ||
|  |     let w = waker(Arc::new(w));
 | ||
|  |     let cx = &mut Context::from_waker(&w);
 | ||
|  | 
 | ||
|  |     // create unbounded channel
 | ||
|  |     let (tx, rx) = unbounded::<()>();
 | ||
|  |     let mut stream = rx.stream();
 | ||
|  | 
 | ||
|  |     // register waker with stream
 | ||
|  |     let _ = Pin::new(&mut stream).poll_next(cx);
 | ||
|  | 
 | ||
|  |     // send multiple items
 | ||
|  |     tx.send(()).unwrap();
 | ||
|  |     tx.send(()).unwrap();
 | ||
|  |     tx.send(()).unwrap();
 | ||
|  | 
 | ||
|  |     // verify that stream is only woken up once.
 | ||
|  |     assert_eq!(count.load(Ordering::SeqCst), 1);
 | ||
|  | }
 | ||
|  | 
 | ||
|  | #[cfg(feature = "async")]
 | ||
|  | #[async_std::test]
 | ||
|  | async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55
 | ||
|  |     fn dummy_stream() -> impl Stream<Item = usize> {
 | ||
|  |         stream::unfold(0, |count| async move {
 | ||
|  |             if count < 1000 {
 | ||
|  |                 Some((count, count + 1))
 | ||
|  |             } else {
 | ||
|  |                 None
 | ||
|  |             }
 | ||
|  |         })
 | ||
|  |     }
 | ||
|  | 
 | ||
|  |     let (send_task, recv_task) = {
 | ||
|  |         use futures::SinkExt;
 | ||
|  |         let (tx, rx) = flume::bounded(100);
 | ||
|  | 
 | ||
|  |         let send_task = dummy_stream()
 | ||
|  |             .map(|i| Ok(i))
 | ||
|  |             .forward(tx.into_sink().sink_map_err(|e| {
 | ||
|  |                 panic!("send error:{:#?}", e)
 | ||
|  |             }));
 | ||
|  | 
 | ||
|  |         let recv_task = rx
 | ||
|  |             .into_stream()
 | ||
|  |             .for_each(|item| async move {});
 | ||
|  |         (send_task, recv_task)
 | ||
|  |     };
 | ||
|  | 
 | ||
|  |     let jh = async_std::task::spawn(send_task);
 | ||
|  |     async_std::task::block_on(recv_task);
 | ||
|  |     jh.await.unwrap();
 | ||
|  | }
 |