diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs new file mode 100644 index 0000000..a29fc80 --- /dev/null +++ b/src/stream/stream/enumerate.rs @@ -0,0 +1,40 @@ +use crate::task::{Context, Poll}; +use std::pin::Pin; + +use crate::stream::Stream; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Enumerate { + stream: S, + i: usize, +} + +impl Enumerate { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(i: usize); + + pub(super) fn new(stream: S) -> Self { + Enumerate { stream, i: 0 } + } +} + +impl futures_core::stream::Stream for Enumerate +where + S: Stream, +{ + type Item = (usize, S::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => { + let ret = (self.i, v); + *self.as_mut().i() += 1; + Poll::Ready(Some(ret)) + } + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index dacc3f0..74bf740 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -23,6 +23,7 @@ mod all; mod any; +mod enumerate; mod filter_map; mod find; mod find_map; @@ -39,6 +40,7 @@ pub use zip::Zip; use all::AllFuture; use any::AnyFuture; +use enumerate::Enumerate; use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; @@ -197,6 +199,36 @@ pub trait Stream { } } + /// Creates a stream that gives the current element's count as well as the next value. + /// + /// # Overflow behaviour. + /// + /// This combinator does no guarding against overflows. + /// + /// # Examples + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque<_> = vec!['a', 'b', 'c'].into_iter().collect(); + /// let mut s = s.enumerate(); + /// + /// assert_eq!(s.next().await, Some((0, 'a'))); + /// assert_eq!(s.next().await, Some((1, 'b'))); + /// assert_eq!(s.next().await, Some((2, 'c'))); + /// assert_eq!(s.next().await, None); + /// + /// # + /// # }) } + fn enumerate(self) -> Enumerate + where + Self: Sized, + { + Enumerate::new(self) + } + /// Both filters and maps a stream. /// /// # Examples