From ed944d051a9331b76357a7618ccf04c6e525b5a3 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 10 Sep 2019 20:39:45 +0300 Subject: [PATCH] adds stream::enumerate combinator --- src/stream/stream/enumerate.rs | 38 ++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 32 ++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 src/stream/stream/enumerate.rs diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs new file mode 100644 index 0000000..de29578 --- /dev/null +++ b/src/stream/stream/enumerate.rs @@ -0,0 +1,38 @@ +use crate::task::{Context, Poll}; +use std::pin::Pin; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Enumerate { + stream: S, + i: usize, +} + +impl Enumerate { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(i: usize); + + pub(super) fn new(stream: S) -> Self { + Enumerate { stream, i: 0 } + } +} + +impl futures_core::stream::Stream for Enumerate +where + S: futures_core::stream::Stream, +{ + type Item = (usize, S::Item); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => { + let ret = (self.i, v); + *self.as_mut().i() += 1; + Poll::Ready(Some(ret)) + } + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index eddafe2..4ca42c6 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -23,6 +23,7 @@ mod all; mod any; +mod enumerate; mod filter_map; mod find_map; mod min_by; @@ -34,6 +35,7 @@ pub use take::Take; use all::AllFuture; use any::AnyFuture; +use enumerate::Enumerate; use filter_map::FilterMap; use find_map::FindMapFuture; use min_by::MinByFuture; @@ -136,6 +138,36 @@ pub trait Stream { } } + /// Creates a stream that gives the current element's count as well as the next value. + /// + /// # Overflow behaviour. + /// + /// This combinator does no guarding against overflows. + /// + /// # Examples + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat(9).take(4).enumerate(); + /// let mut c: usize = 0; + /// + /// while let Some((i, v)) = s.next().await { + /// assert_eq!(c, i); + /// assert_eq!(v, 9); + /// c += 1; + /// } + /// # + /// # }) } + fn enumerate(self) -> Enumerate + where + Self: Sized, + { + Enumerate::new(self) + } + /// Both filters and maps a stream. /// /// # Examples