diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d165e87..2e35f2d 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -45,9 +45,5 @@ cfg_if! { pub use extend::Extend; pub use from_stream::FromStream; pub use into_stream::IntoStream; - - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - #[doc(inline)] - pub use async_macros::{join_stream as merge, JoinStream as Merge}; } } diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs new file mode 100644 index 0000000..6c213f2 --- /dev/null +++ b/src/stream/stream/merge.rs @@ -0,0 +1,42 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::Stream; + +/// A stream merging two streams. +/// +/// This stream is returned by [`Stream::merge`]. +/// +/// [`Stream::merge`]: +#[derive(Debug)] +pub struct Merge { + left: L, + right: R, +} + +impl Unpin for Merge {} + +impl Merge { + pub(crate) fn new(left: L, right: R) -> Self { + Self { left, right } + } +} + +impl Stream for Merge +where + L: Stream + Unpin, + R: Stream + Unpin, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) { + // The first stream made progress. The Merge needs to be polled + // again to check the progress of the second stream. + cx.waker().wake_by_ref(); + Poll::Ready(Some(item)) + } else { + Pin::new(&mut self.right).poll_next(cx) + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 1bb0d31..22ceabb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -34,6 +34,7 @@ mod for_each; mod fuse; mod inspect; mod map; +mod merge; mod min_by; mod next; mod nth; @@ -91,6 +92,8 @@ cfg_if! { use crate::future::Future; use crate::stream::FromStream; + + pub use merge::Merge; } } @@ -1147,6 +1150,41 @@ extension_trait! { { FromStream::from_stream(self) } + + #[doc = r#" + Combines multiple streams into a single stream of all their outputs. + + This macro is only usable inside of async functions, closures, and blocks. + + # Examples + + ``` + # async_std::task::block_on(async { + use async_std::prelude::*; + use async_std::stream; + + let a = stream::once(1u8); + let b = stream::once(2u8); + let c = stream::once(3u8); + + let mut s = a.merge(b).merge(c); + + assert_eq!(s.next().await, Some(1u8)); + assert_eq!(s.next().await, Some(2u8)); + assert_eq!(s.next().await, Some(3u8)); + assert_eq!(s.next().await, None); + # }); + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn merge(self, other: U) -> Merge + where + Self: Sized, + U: Stream + Sized, + { + Merge::new(self, other) + } } impl Stream for Box {