Merge pull request #321 from async-rs/stream_merge

rename stream::join to Stream::merge
yoshuawuyts-patch-1
Yoshua Wuyts 5 years ago committed by GitHub
commit 454018ef42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -46,8 +46,6 @@ cfg_if! {
pub use from_stream::FromStream; pub use from_stream::FromStream;
pub use into_stream::IntoStream; pub use into_stream::IntoStream;
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub use stream::Merge;
#[doc(inline)]
pub use async_macros::{join_stream as join, JoinStream as Join};
} }
} }

@ -0,0 +1,42 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
/// A stream that merges two other streams into a single stream.
///
/// This stream is returned by [`Stream::merge`].
///
/// [`Stream::merge`]: trait.Stream.html#method.merge
#[derive(Debug)]
pub struct Merge<L, R> {
left: L,
right: R,
}
impl<L, R> Unpin for Merge<L, R> {}
impl<L, R> Merge<L, R> {
pub(crate) fn new(left: L, right: R) -> Self {
Self { left, right }
}
}
impl<L, R, T> Stream for Merge<L, R>
where
L: Stream<Item = T> + Unpin,
R: Stream<Item = T> + Unpin,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
// The first stream made progress. The Merge needs to be polled
// again to check the progress of the second stream.
cx.waker().wake_by_ref();
Poll::Ready(Some(item))
} else {
Pin::new(&mut self.right).poll_next(cx)
}
}
}

@ -87,10 +87,14 @@ cfg_if! {
cfg_if! { cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] { if #[cfg(any(feature = "unstable", feature = "docs"))] {
mod merge;
use std::pin::Pin; use std::pin::Pin;
use crate::future::Future; use crate::future::Future;
use crate::stream::FromStream; use crate::stream::FromStream;
pub use merge::Merge;
} }
} }
@ -1147,6 +1151,42 @@ extension_trait! {
{ {
FromStream::from_stream(self) FromStream::from_stream(self)
} }
#[doc = r#"
Combines multiple streams into a single stream of all their outputs.
Items are yielded as soon as they're received, and the stream continues yield until both
streams have been exhausted.
# Examples
```
# async_std::task::block_on(async {
use async_std::prelude::*;
use async_std::stream;
let a = stream::once(1u8);
let b = stream::once(2u8);
let c = stream::once(3u8);
let mut s = a.merge(b).merge(c);
assert_eq!(s.next().await, Some(1u8));
assert_eq!(s.next().await, Some(2u8));
assert_eq!(s.next().await, Some(3u8));
assert_eq!(s.next().await, None);
# });
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn merge<U>(self, other: U) -> Merge<Self, U>
where
Self: Sized,
U: Stream<Item = Self::Item> + Sized,
{
Merge::new(self, other)
}
} }
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> { impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

Loading…
Cancel
Save