From ed944d051a9331b76357a7618ccf04c6e525b5a3 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 10 Sep 2019 20:39:45 +0300 Subject: [PATCH 1/5] adds stream::enumerate combinator --- src/stream/stream/enumerate.rs | 38 ++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 32 ++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 src/stream/stream/enumerate.rs diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs new file mode 100644 index 00000000..de295785 --- /dev/null +++ b/src/stream/stream/enumerate.rs @@ -0,0 +1,38 @@ +use crate::task::{Context, Poll}; +use std::pin::Pin; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Enumerate { + stream: S, + i: usize, +} + +impl Enumerate { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(i: usize); + + pub(super) fn new(stream: S) -> Self { + Enumerate { stream, i: 0 } + } +} + +impl futures_core::stream::Stream for Enumerate +where + S: futures_core::stream::Stream, +{ + type Item = (usize, S::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => { + let ret = (self.i, v); + *self.as_mut().i() += 1; + Poll::Ready(Some(ret)) + } + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index eddafe28..4ca42c6a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -23,6 +23,7 @@ mod all; mod any; +mod enumerate; mod filter_map; mod find_map; mod min_by; @@ -34,6 +35,7 @@ pub use take::Take; use all::AllFuture; use any::AnyFuture; +use enumerate::Enumerate; use filter_map::FilterMap; use find_map::FindMapFuture; use min_by::MinByFuture; @@ -136,6 +138,36 @@ pub trait Stream { } } + /// Creates a stream that gives the current element's count as well as the next value. + /// + /// # Overflow behaviour. + /// + /// This combinator does no guarding against overflows. + /// + /// # Examples + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat(9).take(4).enumerate(); + /// let mut c: usize = 0; + /// + /// while let Some((i, v)) = s.next().await { + /// assert_eq!(c, i); + /// assert_eq!(v, 9); + /// c += 1; + /// } + /// # + /// # }) } + fn enumerate(self) -> Enumerate + where + Self: Sized, + { + Enumerate::new(self) + } + /// Both filters and maps a stream. /// /// # Examples From 2d75ffacc4cddad597c49007d1eefbd49b616dc7 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Sep 2019 10:08:08 +0300 Subject: [PATCH 2/5] fixes example to resemble std more --- src/stream/stream/mod.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 4ca42c6a..3fe565bc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -149,16 +149,15 @@ pub trait Stream { /// # fn main() { async_std::task::block_on(async { /// # /// use async_std::prelude::*; - /// use async_std::stream; + /// use std::collections::VecDeque; /// - /// let mut s = stream::repeat(9).take(4).enumerate(); - /// let mut c: usize = 0; + /// let s: VecDeque<_> = vec!['a', 'b', 'c'].into_iter().collect(); + /// let mut s = s.enumerate(); + /// + /// assert_eq!(s.next().await, Some((0, 'a'))); + /// assert_eq!(s.next().await, Some((1, 'b'))); + /// assert_eq!(s.next().await, Some((2, 'c'))); /// - /// while let Some((i, v)) = s.next().await { - /// assert_eq!(c, i); - /// assert_eq!(v, 9); - /// c += 1; - /// } /// # /// # }) } fn enumerate(self) -> Enumerate From cdd4215e8fdd0298c9346f9bcaf7b1dabc48e1d3 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Sep 2019 10:09:52 +0300 Subject: [PATCH 3/5] forgot None case --- src/stream/stream/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 3fe565bc..68df3e72 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -157,6 +157,7 @@ pub trait Stream { /// assert_eq!(s.next().await, Some((0, 'a'))); /// assert_eq!(s.next().await, Some((1, 'b'))); /// assert_eq!(s.next().await, Some((2, 'c'))); + /// assert_eq!(s.next().await, None); /// /// # /// # }) } From 3dc33f54b49f5b3ccf129730e03ab8de39fcce9d Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Sep 2019 22:03:44 +0300 Subject: [PATCH 4/5] fixes after #145 --- src/stream/stream/enumerate.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index de295785..8576da82 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -1,6 +1,8 @@ use crate::task::{Context, Poll}; use std::pin::Pin; +use crate::stream::Stream; + #[doc(hidden)] #[allow(missing_debug_implementations)] pub struct Enumerate { @@ -19,7 +21,7 @@ impl Enumerate { impl futures_core::stream::Stream for Enumerate where - S: futures_core::stream::Stream, + S: Stream + Unpin + Sized, { type Item = (usize, S::Item); From 9487b73f12da598048a3aff840f9fe35f1987f5c Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 17 Sep 2019 12:31:24 +0300 Subject: [PATCH 5/5] Update src/stream/stream/enumerate.rs Co-Authored-By: Stjepan Glavina --- src/stream/stream/enumerate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index 8576da82..a29fc801 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -21,7 +21,7 @@ impl Enumerate { impl futures_core::stream::Stream for Enumerate where - S: Stream + Unpin + Sized, + S: Stream, { type Item = (usize, S::Item);