From e74c0cec1f3dd84e2857d12109b7e6d234386fdc Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 21 Sep 2019 17:37:25 +0300 Subject: [PATCH] adds stream::step_by combinator --- src/stream/stream/mod.rs | 36 ++++++++++++++++++++++++++ src/stream/stream/step_by.rs | 50 ++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 src/stream/stream/step_by.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 429241d..754dc19 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -35,6 +35,7 @@ mod next; mod nth; mod scan; mod skip; +mod step_by; mod take; mod zip; @@ -42,6 +43,7 @@ pub use filter::Filter; pub use fuse::Fuse; pub use scan::Scan; pub use skip::Skip; +pub use step_by::StepBy; pub use take::Take; pub use zip::Zip; @@ -228,6 +230,40 @@ pub trait Stream { } } + /// Creates a stream that yields each `step`th element. + /// + /// # Panics + /// + /// This method will panic if the given step is `0`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect(); + /// let mut stepped = s.step_by(2); + /// + /// assert_eq!(stepped.next().await, Some(0)); + /// assert_eq!(stepped.next().await, Some(2)); + /// assert_eq!(stepped.next().await, Some(4)); + /// assert_eq!(stepped.next().await, None); + /// + /// # + /// # }) } + /// ``` + fn step_by(self, step: usize) -> StepBy + where + Self: Sized, + { + StepBy::new(self, step) + } + /// Creates a stream that gives the current element's count as well as the next value. /// /// # Overflow behaviour. diff --git a/src/stream/stream/step_by.rs b/src/stream/stream/step_by.rs new file mode 100644 index 0000000..f84feec --- /dev/null +++ b/src/stream/stream/step_by.rs @@ -0,0 +1,50 @@ +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that steps a given amount of elements of another stream. +#[derive(Debug)] +pub struct StepBy { + stream: S, + step: usize, + i: usize, +} + +impl StepBy { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(step: usize); + pin_utils::unsafe_unpinned!(i: usize); + + pub(crate) fn new(stream: S, step: usize) -> Self { + StepBy { + stream, + step: step.checked_sub(1).unwrap(), + i: 0, + } + } +} + +impl Stream for StepBy +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match self.i { + 0 => { + *self.as_mut().i() = self.step; + return Poll::Ready(Some(v)); + } + _ => *self.as_mut().i() -= 1, + }, + None => return Poll::Ready(None), + } + } + } +}