diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e4c29bc4..f126455d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -37,6 +37,7 @@ mod nth; mod scan; mod skip; mod skip_while; +mod step_by; mod take; mod zip; @@ -46,6 +47,7 @@ pub use inspect::Inspect; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; +pub use step_by::StepBy; pub use take::Take; pub use zip::Zip; @@ -232,6 +234,40 @@ pub trait Stream { } } + /// Creates a stream that yields each `step`th element. + /// + /// # Panics + /// + /// This method will panic if the given step is `0`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect(); + /// let mut stepped = s.step_by(2); + /// + /// assert_eq!(stepped.next().await, Some(0)); + /// assert_eq!(stepped.next().await, Some(2)); + /// assert_eq!(stepped.next().await, Some(4)); + /// assert_eq!(stepped.next().await, None); + /// + /// # + /// # }) } + /// ``` + fn step_by(self, step: usize) -> StepBy + where + Self: Sized, + { + StepBy::new(self, step) + } + /// Creates a stream that gives the current element's count as well as the next value. /// /// # Overflow behaviour. diff --git a/src/stream/stream/step_by.rs b/src/stream/stream/step_by.rs new file mode 100644 index 00000000..f84feecd --- /dev/null +++ b/src/stream/stream/step_by.rs @@ -0,0 +1,50 @@ +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that steps a given amount of elements of another stream. +#[derive(Debug)] +pub struct StepBy { + stream: S, + step: usize, + i: usize, +} + +impl StepBy { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(step: usize); + pin_utils::unsafe_unpinned!(i: usize); + + pub(crate) fn new(stream: S, step: usize) -> Self { + StepBy { + stream, + step: step.checked_sub(1).unwrap(), + i: 0, + } + } +} + +impl Stream for StepBy +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match self.i { + 0 => { + *self.as_mut().i() = self.step; + return Poll::Ready(Some(v)); + } + _ => *self.as_mut().i() -= 1, + }, + None => return Poll::Ready(None), + } + } + } +}