diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 3146bd3..f749ebe 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -25,7 +25,7 @@ pub use double_ended_stream::DoubleEndedStream; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Scan, Stream, Take}; +pub use stream::{Scan, Stream, Take, Zip}; mod double_ended_stream; mod empty; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b0500c6..dacc3f0 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -31,9 +31,11 @@ mod next; mod nth; mod scan; mod take; +mod zip; pub use scan::Scan; pub use take::Take; +pub use zip::Zip; use all::AllFuture; use any::AnyFuture; @@ -545,6 +547,49 @@ pub trait Stream { { Scan::new(self, initial_state, f) } + + /// 'Zips up' two streams into a single stream of pairs. + /// + /// `zip()` returns a new stream that will iterate over two other streams, returning a tuple + /// where the first element comes from the first stream, and the second element comes from the + /// second stream. + /// + /// In other words, it zips two streams together, into a single one. + /// + /// If either stream returns [`None`], [`poll_next`] from the zipped stream will return + /// [`None`]. If the first stream returns [`None`], `zip` will short-circuit and `poll_next` + /// will not be called on the second stream. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`poll_next`]: #tymethod.poll_next + /// + /// ## Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let l: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let r: VecDeque = vec![4, 5, 6, 7].into_iter().collect(); + /// let mut s = l.zip(r); + /// + /// assert_eq!(s.next().await, Some((1, 4))); + /// assert_eq!(s.next().await, Some((2, 5))); + /// assert_eq!(s.next().await, Some((3, 6))); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + #[inline] + fn zip(self, other: U) -> Zip + where + Self: Sized, + U: Stream, + { + Zip::new(self, other) + } } impl Stream for T { diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs new file mode 100644 index 0000000..8f7c9ab --- /dev/null +++ b/src/stream/stream/zip.rs @@ -0,0 +1,54 @@ +use std::fmt; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// An iterator that iterates two other iterators simultaneously. +pub struct Zip { + item_slot: Option, + first: A, + second: B, +} + +impl fmt::Debug for Zip { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Zip") + .field("first", &self.first) + .field("second", &self.second) + .finish() + } +} + +impl Unpin for Zip {} + +impl Zip { + pub(crate) fn new(first: A, second: B) -> Self { + Zip { + item_slot: None, + first, + second, + } + } + + pin_utils::unsafe_unpinned!(item_slot: Option); + pin_utils::unsafe_pinned!(first: A); + pin_utils::unsafe_pinned!(second: B); +} + +impl futures_core::stream::Stream for Zip { + type Item = (A::Item, B::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.as_mut().item_slot().is_none() { + match self.as_mut().first().poll_next(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(item)) => *self.as_mut().item_slot() = Some(item), + } + } + let second_item = futures_core::ready!(self.as_mut().second().poll_next(cx)); + let first_item = self.as_mut().item_slot().take().unwrap(); + Poll::Ready(second_item.map(|second_item| (first_item, second_item))) + } +}