rename stream::join to Stream::merge

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
yoshuawuyts-patch-1
Yoshua Wuyts 5 years ago
parent a2baa1d8e0
commit 84a148ddae
No known key found for this signature in database
GPG Key ID: 24EA8164F96777ED

@ -45,9 +45,5 @@ cfg_if! {
pub use extend::Extend; pub use extend::Extend;
pub use from_stream::FromStream; pub use from_stream::FromStream;
pub use into_stream::IntoStream; pub use into_stream::IntoStream;
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[doc(inline)]
pub use async_macros::{join_stream as merge, JoinStream as Merge};
} }
} }

@ -0,0 +1,42 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
/// A stream merging two streams.
///
/// This stream is returned by [`Stream::merge`].
///
/// [`Stream::merge`]:
#[derive(Debug)]
pub struct Merge<L, R> {
left: L,
right: R,
}
impl<L, R> Unpin for Merge<L, R> {}
impl<L, R> Merge<L, R> {
pub(crate) fn new(left: L, right: R) -> Self {
Self { left, right }
}
}
impl<L, R, T> Stream for Merge<L, R>
where
L: Stream<Item = T> + Unpin,
R: Stream<Item = T> + Unpin,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
// The first stream made progress. The Merge needs to be polled
// again to check the progress of the second stream.
cx.waker().wake_by_ref();
Poll::Ready(Some(item))
} else {
Pin::new(&mut self.right).poll_next(cx)
}
}
}

@ -34,6 +34,7 @@ mod for_each;
mod fuse; mod fuse;
mod inspect; mod inspect;
mod map; mod map;
mod merge;
mod min_by; mod min_by;
mod next; mod next;
mod nth; mod nth;
@ -91,6 +92,8 @@ cfg_if! {
use crate::future::Future; use crate::future::Future;
use crate::stream::FromStream; use crate::stream::FromStream;
pub use merge::Merge;
} }
} }
@ -1147,6 +1150,41 @@ extension_trait! {
{ {
FromStream::from_stream(self) FromStream::from_stream(self)
} }
#[doc = r#"
Combines multiple streams into a single stream of all their outputs.
This macro is only usable inside of async functions, closures, and blocks.
# Examples
```
# async_std::task::block_on(async {
use async_std::prelude::*;
use async_std::stream;
let a = stream::once(1u8);
let b = stream::once(2u8);
let c = stream::once(3u8);
let mut s = a.merge(b).merge(c);
assert_eq!(s.next().await, Some(1u8));
assert_eq!(s.next().await, Some(2u8));
assert_eq!(s.next().await, Some(3u8));
assert_eq!(s.next().await, None);
# });
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn merge<U>(self, other: U) -> Merge<Self, U>
where
Self: Sized,
U: Stream<Item = Self::Item> + Sized,
{
Merge::new(self, other)
}
} }
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> { impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

Loading…
Cancel
Save