diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ddab212..54443125 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -33,6 +33,7 @@ mod min_by; mod next; mod nth; mod scan; +mod skip_while; mod take; mod zip; @@ -51,6 +52,7 @@ use fold::FoldFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use skip_while::SkipWhile; use std::cmp::Ordering; use std::marker::PhantomData; @@ -661,6 +663,38 @@ pub trait Stream { Scan::new(self, initial_state, f) } + /// Combinator that `skip`s elements based on a predicate. + /// + /// Takes a closure argument. It will call this closure on every element in + /// the stream and ignore elements until it returns `false`. + /// + /// After `false` is returned, `SkipWhile`'s job is over and all further + /// elements in the strem are yeilded. + /// + /// ## Examples + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect(); + /// let mut s = a.skip_while(|x| x.is_negative()); + /// + /// assert_eq!(s.next().await, Some(0)); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn skip_while
(self, predicate: P) -> SkipWhile ,
+ __t: PhantomData );
+
+ pub(crate) fn new(stream: S, predicate: P) -> Self {
+ SkipWhile {
+ stream,
+ predicate: Some(predicate),
+ __t: PhantomData,
+ }
+ }
+}
+
+impl {
+ stream: S,
+ predicate: Option SkipWhile {
+ pin_utils::unsafe_pinned!(stream: S);
+ pin_utils::unsafe_unpinned!(predicate: Option Stream for SkipWhile
+where
+ S: Stream,
+ P: FnMut(&S::Item) -> bool,
+{
+ type Item = S::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll