diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 3d1b103..359cb7a 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -46,8 +46,6 @@ cfg_if! { pub use from_stream::FromStream; pub use into_stream::IntoStream; - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - #[doc(inline)] - pub use async_macros::{join_stream as join, JoinStream as Join}; + pub use stream::Merge; } } diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs new file mode 100644 index 0000000..5e7b226 --- /dev/null +++ b/src/stream/stream/merge.rs @@ -0,0 +1,42 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::Stream; + +/// A stream that merges two other streams into a single stream. +/// +/// This stream is returned by [`Stream::merge`]. +/// +/// [`Stream::merge`]: trait.Stream.html#method.merge +#[derive(Debug)] +pub struct Merge { + left: L, + right: R, +} + +impl Unpin for Merge {} + +impl Merge { + pub(crate) fn new(left: L, right: R) -> Self { + Self { left, right } + } +} + +impl Stream for Merge +where + L: Stream + Unpin, + R: Stream + Unpin, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) { + // The first stream made progress. The Merge needs to be polled + // again to check the progress of the second stream. + cx.waker().wake_by_ref(); + Poll::Ready(Some(item)) + } else { + Pin::new(&mut self.right).poll_next(cx) + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 1bb0d31..054d81b 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -87,10 +87,14 @@ cfg_if! { cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { + mod merge; + use std::pin::Pin; use crate::future::Future; use crate::stream::FromStream; + + pub use merge::Merge; } } @@ -1147,6 +1151,42 @@ extension_trait! { { FromStream::from_stream(self) } + + #[doc = r#" + Combines multiple streams into a single stream of all their outputs. + + Items are yielded as soon as they're received, and the stream continues yield until both + streams have been exhausted. + + # Examples + + ``` + # async_std::task::block_on(async { + use async_std::prelude::*; + use async_std::stream; + + let a = stream::once(1u8); + let b = stream::once(2u8); + let c = stream::once(3u8); + + let mut s = a.merge(b).merge(c); + + assert_eq!(s.next().await, Some(1u8)); + assert_eq!(s.next().await, Some(2u8)); + assert_eq!(s.next().await, Some(3u8)); + assert_eq!(s.next().await, None); + # }); + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn merge(self, other: U) -> Merge + where + Self: Sized, + U: Stream + Sized, + { + Merge::new(self, other) + } } impl Stream for Box {