adds stream::filter combinator

staging
Fedor Sakharov 5 years ago
parent 55ea367415
commit e7ae10ebee
No known key found for this signature in database
GPG Key ID: 93D436E666BF0FEE

@ -0,0 +1,49 @@
use std::marker::PhantomData;
use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Filter<S, P, T> {
stream: S,
predicate: P,
__t: PhantomData<T>,
}
impl<S, P, T> Filter<S, P, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(predicate: P);
pub(super) fn new(stream: S, predicate: P) -> Self {
Filter {
stream,
predicate,
__t: PhantomData,
}
}
}
impl<S, P> futures_core::stream::Stream for Filter<S, P, S::Item>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match (self.as_mut().predicate())(&v) {
true => Poll::Ready(Some(v)),
false => {
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

@ -24,6 +24,7 @@
mod all;
mod any;
mod enumerate;
mod filter;
mod filter_map;
mod find;
mod find_map;
@ -44,6 +45,7 @@ pub use zip::Zip;
use all::AllFuture;
use any::AnyFuture;
use enumerate::Enumerate;
use filter::Filter;
use filter_map::FilterMap;
use find::FindFuture;
use find_map::FindMapFuture;
@ -282,6 +284,34 @@ pub trait Stream {
done: false,
}
}
/// Creates a stream that uses a predicate to determine if an element
/// should be yeilded.
///
/// # Examples
///
/// Basic usage:
///
///```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let s: VecDeque<usize> = vec![1, 2, 3, 4].into_iter().collect();
/// let mut s = s.filter(|i| i % 2 == 0);
///
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(4));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
Filter::new(self, predicate)
}
/// Both filters and maps a stream.
///

Loading…
Cancel
Save