From f9f97c43c44f180957b9a9bc619c0dea0d0a3de3 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 21 Sep 2019 15:10:00 +0300 Subject: [PATCH] adds stream::skip_while combinator --- src/stream/stream/mod.rs | 34 +++++++++++++++++++++ src/stream/stream/skip_while.rs | 54 +++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 src/stream/stream/skip_while.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ddab212..54443125 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -33,6 +33,7 @@ mod min_by; mod next; mod nth; mod scan; +mod skip_while; mod take; mod zip; @@ -51,6 +52,7 @@ use fold::FoldFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use skip_while::SkipWhile; use std::cmp::Ordering; use std::marker::PhantomData; @@ -661,6 +663,38 @@ pub trait Stream { Scan::new(self, initial_state, f) } + /// Combinator that `skip`s elements based on a predicate. + /// + /// Takes a closure argument. It will call this closure on every element in + /// the stream and ignore elements until it returns `false`. + /// + /// After `false` is returned, `SkipWhile`'s job is over and all further + /// elements in the strem are yeilded. + /// + /// ## Examples + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect(); + /// let mut s = a.skip_while(|x| x.is_negative()); + /// + /// assert_eq!(s.next().await, Some(0)); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn skip_while

(self, predicate: P) -> SkipWhile + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + SkipWhile::new(self, predicate) + } + /// 'Zips up' two streams into a single stream of pairs. /// /// `zip()` returns a new stream that will iterate over two other streams, returning a tuple diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs new file mode 100644 index 00000000..59f564a2 --- /dev/null +++ b/src/stream/stream/skip_while.rs @@ -0,0 +1,54 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct SkipWhile { + stream: S, + predicate: Option

, + __t: PhantomData, +} + +impl SkipWhile { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(predicate: Option

); + + pub(crate) fn new(stream: S, predicate: P) -> Self { + SkipWhile { + stream, + predicate: Some(predicate), + __t: PhantomData, + } + } +} + +impl Stream for SkipWhile +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match self.as_mut().predicate() { + Some(p) => match p(&v) { + true => (), + false => { + *self.as_mut().predicate() = None; + return Poll::Ready(Some(v)); + } + }, + None => return Poll::Ready(Some(v)), + }, + None => return Poll::Ready(None), + } + } + } +}