From 55bdea464921206e36c88712e823c3fa0924ee41 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sun, 8 Sep 2019 18:09:33 +0300 Subject: [PATCH 1/2] adds stream::filter_map combinator --- src/stream/stream/filter_map.rs | 48 +++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 41 ++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 src/stream/stream/filter_map.rs diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs new file mode 100644 index 0000000..626a8ec --- /dev/null +++ b/src/stream/stream/filter_map.rs @@ -0,0 +1,48 @@ +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A stream that both filters and maps. +#[derive(Clone, Debug)] +pub struct FilterMap { + stream: S, + f: F, + __from: PhantomData, + __to: PhantomData, +} + +impl FilterMap { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + + pub(crate) fn new(stream: S, f: F) -> Self { + FilterMap { + stream, + f, + __from: PhantomData, + __to: PhantomData, + } + } +} + +impl futures_core::stream::Stream for FilterMap +where + S: futures_core::stream::Stream, + F: FnMut(S::Item) -> Option, +{ + type Item = B; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => match (self.as_mut().f())(v) { + Some(b) => Poll::Ready(Some(b)), + None => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 91b111e..72bee64 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -23,6 +23,7 @@ mod all; mod any; +mod filter_map; mod min_by; mod next; mod take; @@ -31,6 +32,7 @@ pub use take::Take; use all::AllFuture; use any::AnyFuture; +use filter_map::FilterMap; use min_by::MinByFuture; use next::NextFuture; @@ -128,6 +130,45 @@ pub trait Stream { } } + /// Both filters and maps a stream. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect(); + /// + /// let mut parsed = s.filter_map(|a| a.parse::().ok()); + /// + /// let one = parsed.next().await; + /// assert_eq!(one, Some(1)); + /// + /// let three = parsed.next().await; + /// assert_eq!(three, Some(3)); + /// + /// let five = parsed.next().await; + /// assert_eq!(five, Some(5)); + /// + /// let end = parsed.next().await; + /// assert_eq!(end, None); + /// + /// # + /// # }) } + fn filter_map(self, f: F) -> FilterMap + where + Self: Sized, + F: FnMut(Self::Item) -> Option, + { + FilterMap::new(self, f) + } + /// Returns the element that gives the minimum value with respect to the /// specified comparison function. If several elements are equally minimum, /// the first element is returned. If the stream is empty, `None` is returned. From 9b381e427f90fe211f0b18adf6200ac5161678d3 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 10 Sep 2019 15:01:25 +0300 Subject: [PATCH 2/2] Apply suggestions from code review Co-Authored-By: Yoshua Wuyts --- src/stream/stream/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 72bee64..89881c0 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -137,7 +137,6 @@ pub trait Stream { /// Basic usage: /// /// ``` - /// /// # fn main() { async_std::task::block_on(async { /// # /// use std::collections::VecDeque; @@ -158,7 +157,6 @@ pub trait Stream { /// /// let end = parsed.next().await; /// assert_eq!(end, None); - /// /// # /// # }) } fn filter_map(self, f: F) -> FilterMap