From fa91d7f856a9f0afc9689371ced6f1c941c346bc Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sun, 3 Nov 2019 02:11:59 +0300 Subject: [PATCH] Stream::merge does not end prematurely if one stream is delayed (#437) * Stream::merge does not end prematurely if one stream is delayed * `cargo test` without features works * Stream::merge works correctly for unfused streams --- Cargo.toml | 8 +++ src/stream/stream/merge.rs | 26 ++++++---- src/stream/stream/mod.rs | 2 +- tests/stream.rs | 100 +++++++++++++++++++++++++++++++++++++ 4 files changed, 124 insertions(+), 12 deletions(-) create mode 100644 tests/stream.rs diff --git a/Cargo.toml b/Cargo.toml index b117b113..4e9dc09c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,3 +56,11 @@ futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] } # These are used by the book for examples futures-channel-preview = "=0.3.0-alpha.19" futures-util-preview = "=0.3.0-alpha.19" + +[[test]] +name = "stream" +required-features = ["unstable"] + +[[example]] +name = "tcp-ipv4-and-6-echo" +required-features = ["unstable"] diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index d926ec4f..f3505aca 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -4,6 +4,9 @@ use std::task::{Context, Poll}; use futures_core::Stream; use pin_project_lite::pin_project; +use crate::prelude::*; +use crate::stream::Fuse; + pin_project! { /// A stream that merges two other streams into a single stream. /// @@ -17,15 +20,15 @@ pin_project! { #[derive(Debug)] pub struct Merge { #[pin] - left: L, + left: Fuse, #[pin] - right: R, + right: Fuse, } } -impl Merge { +impl Merge { pub(crate) fn new(left: L, right: R) -> Self { - Self { left, right } + Self { left: left.fuse(), right: right.fuse() } } } @@ -38,13 +41,14 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - if let Poll::Ready(Some(item)) = this.left.poll_next(cx) { - // The first stream made progress. The Merge needs to be polled - // again to check the progress of the second stream. - cx.waker().wake_by_ref(); - Poll::Ready(Some(item)) - } else { - this.right.poll_next(cx) + match this.left.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => this.right.poll_next(cx), + Poll::Pending => match this.right.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Pending, + Poll::Pending => Poll::Pending, + } } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 4a3875c4..e182c033 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1667,7 +1667,7 @@ extension_trait! { } #[doc = r#" - Searches for an element in a Stream that satisfies a predicate, returning + Searches for an element in a Stream that satisfies a predicate, returning its index. # Examples diff --git a/tests/stream.rs b/tests/stream.rs new file mode 100644 index 00000000..42a6191f --- /dev/null +++ b/tests/stream.rs @@ -0,0 +1,100 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use async_std::prelude::*; +use async_std::stream; +use async_std::sync::channel; +use async_std::task; + +#[test] +/// Checks that streams are merged fully even if one of the components +/// experiences delay. +fn merging_delayed_streams_work() { + let (sender, receiver) = channel::(10); + + let mut s = receiver.merge(stream::empty()); + let t = task::spawn(async move { + let mut xs = Vec::new(); + while let Some(x) = s.next().await { + xs.push(x); + } + xs + }); + + task::block_on(async move { + task::sleep(std::time::Duration::from_millis(500)).await; + sender.send(92).await; + drop(sender); + let xs = t.await; + assert_eq!(xs, vec![92]) + }); + + let (sender, receiver) = channel::(10); + + let mut s = stream::empty().merge(receiver); + let t = task::spawn(async move { + let mut xs = Vec::new(); + while let Some(x) = s.next().await { + xs.push(x); + } + xs + }); + + task::block_on(async move { + task::sleep(std::time::Duration::from_millis(500)).await; + sender.send(92).await; + drop(sender); + let xs = t.await; + assert_eq!(xs, vec![92]) + }); +} + +pin_project! { + /// The opposite of `Fuse`: makes the stream panic if polled after termination. + struct Explode { + #[pin] + done: bool, + #[pin] + inner: S, + } +} + +impl Stream for Explode { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + if *this.done { + panic!("KABOOM!") + } + let res = this.inner.poll_next(cx); + if let Poll::Ready(None) = &res { + *this.done = true; + } + res + } +} + +fn explode(s: S) -> Explode { + Explode { + done: false, + inner: s, + } +} + +#[test] +fn merge_works_with_unfused_streams() { + let s1 = explode(stream::once(92)); + let s2 = explode(stream::once(92)); + let mut s = s1.merge(s2); + let xs = task::block_on(async move { + let mut xs = Vec::new(); + while let Some(x) = s.next().await { + xs.push(x) + } + xs + }); + assert_eq!(xs, vec![92, 92]); +}