adds stream::skip_while combinator

staging
Fedor Sakharov 5 years ago
parent 55ea367415
commit f9f97c43c4
No known key found for this signature in database
GPG Key ID: 93D436E666BF0FEE

@ -33,6 +33,7 @@ mod min_by;
mod next; mod next;
mod nth; mod nth;
mod scan; mod scan;
mod skip_while;
mod take; mod take;
mod zip; mod zip;
@ -51,6 +52,7 @@ use fold::FoldFuture;
use min_by::MinByFuture; use min_by::MinByFuture;
use next::NextFuture; use next::NextFuture;
use nth::NthFuture; use nth::NthFuture;
use skip_while::SkipWhile;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -661,6 +663,38 @@ pub trait Stream {
Scan::new(self, initial_state, f) Scan::new(self, initial_state, f)
} }
/// Combinator that `skip`s elements based on a predicate.
///
/// Takes a closure argument. It will call this closure on every element in
/// the stream and ignore elements until it returns `false`.
///
/// After `false` is returned, `SkipWhile`'s job is over and all further
/// elements in the strem are yeilded.
///
/// ## Examples
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect();
/// let mut s = a.skip_while(|x| x.is_negative());
///
/// assert_eq!(s.next().await, Some(0));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
/// ```
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
SkipWhile::new(self, predicate)
}
/// 'Zips up' two streams into a single stream of pairs. /// 'Zips up' two streams into a single stream of pairs.
/// ///
/// `zip()` returns a new stream that will iterate over two other streams, returning a tuple /// `zip()` returns a new stream that will iterate over two other streams, returning a tuple

@ -0,0 +1,54 @@
use std::marker::PhantomData;
use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct SkipWhile<S, P, T> {
stream: S,
predicate: Option<P>,
__t: PhantomData<T>,
}
impl<S, P, T> SkipWhile<S, P, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(predicate: Option<P>);
pub(crate) fn new(stream: S, predicate: P) -> Self {
SkipWhile {
stream,
predicate: Some(predicate),
__t: PhantomData,
}
}
}
impl<S, P> Stream for SkipWhile<S, P, S::Item>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match self.as_mut().predicate() {
Some(p) => match p(&v) {
true => (),
false => {
*self.as_mut().predicate() = None;
return Poll::Ready(Some(v));
}
},
None => return Poll::Ready(Some(v)),
},
None => return Poll::Ready(None),
}
}
}
}
Loading…
Cancel
Save