204: Add Stream::zip r=stjepang a=tirr-c



Co-authored-by: Wonwoo Choi <chwo9843@gmail.com>
staging
bors[bot] 5 years ago committed by GitHub
commit 265f1ff8eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -25,7 +25,7 @@ pub use double_ended_stream::DoubleEndedStream;
pub use empty::{empty, Empty};
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use stream::{Scan, Stream, Take};
pub use stream::{Scan, Stream, Take, Zip};
mod double_ended_stream;
mod empty;

@ -31,9 +31,11 @@ mod next;
mod nth;
mod scan;
mod take;
mod zip;
pub use scan::Scan;
pub use take::Take;
pub use zip::Zip;
use all::AllFuture;
use any::AnyFuture;
@ -545,6 +547,49 @@ pub trait Stream {
{
Scan::new(self, initial_state, f)
}
/// 'Zips up' two streams into a single stream of pairs.
///
/// `zip()` returns a new stream that will iterate over two other streams, returning a tuple
/// where the first element comes from the first stream, and the second element comes from the
/// second stream.
///
/// In other words, it zips two streams together, into a single one.
///
/// If either stream returns [`None`], [`poll_next`] from the zipped stream will return
/// [`None`]. If the first stream returns [`None`], `zip` will short-circuit and `poll_next`
/// will not be called on the second stream.
///
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
/// [`poll_next`]: #tymethod.poll_next
///
/// ## Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let l: VecDeque<isize> = vec![1, 2, 3].into_iter().collect();
/// let r: VecDeque<isize> = vec![4, 5, 6, 7].into_iter().collect();
/// let mut s = l.zip(r);
///
/// assert_eq!(s.next().await, Some((1, 4)));
/// assert_eq!(s.next().await, Some((2, 5)));
/// assert_eq!(s.next().await, Some((3, 6)));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
/// ```
#[inline]
fn zip<U>(self, other: U) -> Zip<Self, U>
where
Self: Sized,
U: Stream,
{
Zip::new(self, other)
}
}
impl<T: futures_core::stream::Stream + ?Sized> Stream for T {

@ -0,0 +1,54 @@
use std::fmt;
use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// An iterator that iterates two other iterators simultaneously.
pub struct Zip<A: Stream, B> {
item_slot: Option<A::Item>,
first: A,
second: B,
}
impl<A: fmt::Debug + Stream, B: fmt::Debug> fmt::Debug for Zip<A, B> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Zip")
.field("first", &self.first)
.field("second", &self.second)
.finish()
}
}
impl<A: Unpin + Stream, B: Unpin> Unpin for Zip<A, B> {}
impl<A: Stream, B> Zip<A, B> {
pub(crate) fn new(first: A, second: B) -> Self {
Zip {
item_slot: None,
first,
second,
}
}
pin_utils::unsafe_unpinned!(item_slot: Option<A::Item>);
pin_utils::unsafe_pinned!(first: A);
pin_utils::unsafe_pinned!(second: B);
}
impl<A: Stream, B: Stream> futures_core::stream::Stream for Zip<A, B> {
type Item = (A::Item, B::Item);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.as_mut().item_slot().is_none() {
match self.as_mut().first().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(item)) => *self.as_mut().item_slot() = Some(item),
}
}
let second_item = futures_core::ready!(self.as_mut().second().poll_next(cx));
let first_item = self.as_mut().item_slot().take().unwrap();
Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
}
}
Loading…
Cancel
Save