diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 754dc19..96e9b96 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -35,6 +35,7 @@ mod next; mod nth; mod scan; mod skip; +mod skip_while; mod step_by; mod take; mod zip; @@ -43,6 +44,7 @@ pub use filter::Filter; pub use fuse::Fuse; pub use scan::Scan; pub use skip::Skip; +pub use skip_while::SkipWhile; pub use step_by::StepBy; pub use take::Take; pub use zip::Zip; @@ -729,7 +731,13 @@ pub trait Stream { Scan::new(self, initial_state, f) } - /// Creates a combinator that skips the first `n` elements. + /// Combinator that `skip`s elements based on a predicate. + /// + /// Takes a closure argument. It will call this closure on every element in + /// the stream and ignore elements until it returns `false`. + /// + /// After `false` is returned, `SkipWhile`'s job is over and all further + /// elements in the strem are yeilded. /// /// ## Examples /// @@ -739,6 +747,27 @@ pub trait Stream { /// use std::collections::VecDeque; /// use async_std::stream::Stream; /// + /// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect(); + /// let mut s = a.skip_while(|x| x.is_negative()); + /// + /// assert_eq!(s.next().await, Some(0)); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn skip_while

(self, predicate: P) -> SkipWhile + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + SkipWhile::new(self, predicate) + } + + /// Creates a combinator that skips the first `n` elements. + /// + /// ## Examples + /// /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); /// let mut skipped = s.skip(2); /// diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs new file mode 100644 index 0000000..a54b051 --- /dev/null +++ b/src/stream/stream/skip_while.rs @@ -0,0 +1,54 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream to skip elements of another stream based on a predicate. +#[derive(Debug)] +pub struct SkipWhile { + stream: S, + predicate: Option

, + __t: PhantomData, +} + +impl SkipWhile { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(predicate: Option

); + + pub(crate) fn new(stream: S, predicate: P) -> Self { + SkipWhile { + stream, + predicate: Some(predicate), + __t: PhantomData, + } + } +} + +impl Stream for SkipWhile +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match self.as_mut().predicate() { + Some(p) => match p(&v) { + true => (), + false => { + *self.as_mut().predicate() = None; + return Poll::Ready(Some(v)); + } + }, + None => return Poll::Ready(Some(v)), + }, + None => return Poll::Ready(None), + } + } + } +}