From a2baa1d8e0991b786504f5f10bdc2e853c201773 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 13 Oct 2019 21:12:05 +0200 Subject: [PATCH 1/4] rename stream::join to stream::merge Signed-off-by: Yoshua Wuyts --- src/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 3d1b103..d165e87 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -48,6 +48,6 @@ cfg_if! { #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[doc(inline)] - pub use async_macros::{join_stream as join, JoinStream as Join}; + pub use async_macros::{join_stream as merge, JoinStream as Merge}; } } From 84a148ddae8e967bd4167d2b64c5d065115783d3 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 13 Oct 2019 21:48:53 +0200 Subject: [PATCH 2/4] rename stream::join to Stream::merge Signed-off-by: Yoshua Wuyts --- src/stream/mod.rs | 4 ---- src/stream/stream/merge.rs | 42 ++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 38 ++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 src/stream/stream/merge.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d165e87..2e35f2d 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -45,9 +45,5 @@ cfg_if! { pub use extend::Extend; pub use from_stream::FromStream; pub use into_stream::IntoStream; - - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - #[doc(inline)] - pub use async_macros::{join_stream as merge, JoinStream as Merge}; } } diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs new file mode 100644 index 0000000..6c213f2 --- /dev/null +++ b/src/stream/stream/merge.rs @@ -0,0 +1,42 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::Stream; + +/// A stream merging two streams. +/// +/// This stream is returned by [`Stream::merge`]. +/// +/// [`Stream::merge`]: +#[derive(Debug)] +pub struct Merge { + left: L, + right: R, +} + +impl Unpin for Merge {} + +impl Merge { + pub(crate) fn new(left: L, right: R) -> Self { + Self { left, right } + } +} + +impl Stream for Merge +where + L: Stream + Unpin, + R: Stream + Unpin, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) { + // The first stream made progress. The Merge needs to be polled + // again to check the progress of the second stream. + cx.waker().wake_by_ref(); + Poll::Ready(Some(item)) + } else { + Pin::new(&mut self.right).poll_next(cx) + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 1bb0d31..22ceabb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -34,6 +34,7 @@ mod for_each; mod fuse; mod inspect; mod map; +mod merge; mod min_by; mod next; mod nth; @@ -91,6 +92,8 @@ cfg_if! { use crate::future::Future; use crate::stream::FromStream; + + pub use merge::Merge; } } @@ -1147,6 +1150,41 @@ extension_trait! { { FromStream::from_stream(self) } + + #[doc = r#" + Combines multiple streams into a single stream of all their outputs. + + This macro is only usable inside of async functions, closures, and blocks. + + # Examples + + ``` + # async_std::task::block_on(async { + use async_std::prelude::*; + use async_std::stream; + + let a = stream::once(1u8); + let b = stream::once(2u8); + let c = stream::once(3u8); + + let mut s = a.merge(b).merge(c); + + assert_eq!(s.next().await, Some(1u8)); + assert_eq!(s.next().await, Some(2u8)); + assert_eq!(s.next().await, Some(3u8)); + assert_eq!(s.next().await, None); + # }); + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn merge(self, other: U) -> Merge + where + Self: Sized, + U: Stream + Sized, + { + Merge::new(self, other) + } } impl Stream for Box { From b601bcfcb870023f7fda353109fe8b50be81c718 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 13 Oct 2019 21:55:19 +0200 Subject: [PATCH 3/4] polish Signed-off-by: Yoshua Wuyts --- src/stream/stream/merge.rs | 4 ++-- src/stream/stream/mod.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index 6c213f2..5e7b226 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -3,11 +3,11 @@ use std::task::{Context, Poll}; use futures_core::Stream; -/// A stream merging two streams. +/// A stream that merges two other streams into a single stream. /// /// This stream is returned by [`Stream::merge`]. /// -/// [`Stream::merge`]: +/// [`Stream::merge`]: trait.Stream.html#method.merge #[derive(Debug)] pub struct Merge { left: L, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 22ceabb..cf0eea1 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -34,7 +34,6 @@ mod for_each; mod fuse; mod inspect; mod map; -mod merge; mod min_by; mod next; mod nth; @@ -88,6 +87,8 @@ cfg_if! { cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { + mod merge; + use std::pin::Pin; use crate::future::Future; From 04342c7b5dd6af770b013935fc7bb7bbed2341d4 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 13 Oct 2019 22:05:11 +0200 Subject: [PATCH 4/4] docs Signed-off-by: Yoshua Wuyts --- src/stream/mod.rs | 2 ++ src/stream/stream/mod.rs | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 2e35f2d..359cb7a 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -45,5 +45,7 @@ cfg_if! { pub use extend::Extend; pub use from_stream::FromStream; pub use into_stream::IntoStream; + + pub use stream::Merge; } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index cf0eea1..054d81b 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1155,7 +1155,8 @@ extension_trait! { #[doc = r#" Combines multiple streams into a single stream of all their outputs. - This macro is only usable inside of async functions, closures, and blocks. + Items are yielded as soon as they're received, and the stream continues yield until both + streams have been exhausted. # Examples