From e7ae10ebee69258c144d10793e829f786f2b1f2f Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Sep 2019 14:44:42 +0300 Subject: [PATCH 1/2] adds stream::filter combinator --- src/stream/stream/filter.rs | 49 +++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 30 +++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 src/stream/stream/filter.rs diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs new file mode 100644 index 00000000..135d75d0 --- /dev/null +++ b/src/stream/stream/filter.rs @@ -0,0 +1,49 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Filter { + stream: S, + predicate: P, + __t: PhantomData, +} + +impl Filter { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(predicate: P); + + pub(super) fn new(stream: S, predicate: P) -> Self { + Filter { + stream, + predicate, + __t: PhantomData, + } + } +} + +impl futures_core::stream::Stream for Filter +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => match (self.as_mut().predicate())(&v) { + true => Poll::Ready(Some(v)), + false => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ddab212..195f2ebb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod enumerate; +mod filter; mod filter_map; mod find; mod find_map; @@ -44,6 +45,7 @@ pub use zip::Zip; use all::AllFuture; use any::AnyFuture; use enumerate::Enumerate; +use filter::Filter; use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; @@ -282,6 +284,34 @@ pub trait Stream { done: false, } } + /// Creates a stream that uses a predicate to determine if an element + /// should be yeilded. + /// + /// # Examples + /// + /// Basic usage: + /// + ///``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque = vec![1, 2, 3, 4].into_iter().collect(); + /// let mut s = s.filter(|i| i % 2 == 0); + /// + /// assert_eq!(s.next().await, Some(2)); + /// assert_eq!(s.next().await, Some(4)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + fn filter

(self, predicate: P) -> Filter + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + Filter::new(self, predicate) + } /// Both filters and maps a stream. /// From e430851bc400f6bf3c5dc4562c44ae0a012e23f2 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 21 Sep 2019 16:40:01 +0300 Subject: [PATCH 2/2] export Filter type --- src/stream/stream/filter.rs | 4 ++-- src/stream/stream/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 135d75d0..68211f71 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -4,8 +4,8 @@ use std::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; -#[doc(hidden)] -#[allow(missing_debug_implementations)] +/// A stream to filter elements of another stream with a predicate. +#[derive(Debug)] pub struct Filter { stream: S, predicate: P, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 195f2ebb..66645ceb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -37,6 +37,7 @@ mod scan; mod take; mod zip; +pub use filter::Filter; pub use fuse::Fuse; pub use scan::Scan; pub use take::Take; @@ -45,7 +46,6 @@ pub use zip::Zip; use all::AllFuture; use any::AnyFuture; use enumerate::Enumerate; -use filter::Filter; use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture;