2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-02-19 10:49:41 +00:00

Merge branch 'master' into fs-stream-step-by

This commit is contained in:
Fedor Sakharov 2019-09-21 19:07:27 +03:00
commit 376049b51d
No known key found for this signature in database
GPG key ID: 93D436E666BF0FEE
2 changed files with 84 additions and 1 deletions

View file

@ -35,6 +35,7 @@ mod next;
mod nth;
mod scan;
mod skip;
mod skip_while;
mod step_by;
mod take;
mod zip;
@ -43,6 +44,7 @@ pub use filter::Filter;
pub use fuse::Fuse;
pub use scan::Scan;
pub use skip::Skip;
pub use skip_while::SkipWhile;
pub use step_by::StepBy;
pub use take::Take;
pub use zip::Zip;
@ -729,7 +731,13 @@ pub trait Stream {
Scan::new(self, initial_state, f)
}
/// Creates a combinator that skips the first `n` elements.
/// Combinator that `skip`s elements based on a predicate.
///
/// Takes a closure argument. It will call this closure on every element in
/// the stream and ignore elements until it returns `false`.
///
/// After `false` is returned, `SkipWhile`'s job is over and all further
/// elements in the strem are yeilded.
///
/// ## Examples
///
@ -739,6 +747,27 @@ pub trait Stream {
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect();
/// let mut s = a.skip_while(|x| x.is_negative());
///
/// assert_eq!(s.next().await, Some(0));
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
/// ```
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
SkipWhile::new(self, predicate)
}
/// Creates a combinator that skips the first `n` elements.
///
/// ## Examples
///
/// let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
/// let mut skipped = s.skip(2);
///

View file

@ -0,0 +1,54 @@
use std::marker::PhantomData;
use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream to skip elements of another stream based on a predicate.
#[derive(Debug)]
pub struct SkipWhile<S, P, T> {
stream: S,
predicate: Option<P>,
__t: PhantomData<T>,
}
impl<S, P, T> SkipWhile<S, P, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(predicate: Option<P>);
pub(crate) fn new(stream: S, predicate: P) -> Self {
SkipWhile {
stream,
predicate: Some(predicate),
__t: PhantomData,
}
}
}
impl<S, P> Stream for SkipWhile<S, P, S::Item>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match self.as_mut().predicate() {
Some(p) => match p(&v) {
true => (),
false => {
*self.as_mut().predicate() = None;
return Poll::Ready(Some(v));
}
},
None => return Poll::Ready(Some(v)),
},
None => return Poll::Ready(None),
}
}
}
}