163: adds stream::filter_map combinator r=yoshuawuyts a=montekki

Implements a `flat_map` combinator. Currently the same about `ret!` as in #162 .

Also the naming should probably be `FilterMapStream`, but in that case `Take` stream should also change it's name i guess.

Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flat_map
Ref: #129 

Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
staging
bors[bot] 5 years ago committed by GitHub
commit 6d1e71fb68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,48 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A stream that both filters and maps.
#[derive(Clone, Debug)]
pub struct FilterMap<S, F, T, B> {
stream: S,
f: F,
__from: PhantomData<T>,
__to: PhantomData<B>,
}
impl<S, F, T, B> FilterMap<S, F, T, B> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(crate) fn new(stream: S, f: F) -> Self {
FilterMap {
stream,
f,
__from: PhantomData,
__to: PhantomData,
}
}
}
impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B>
where
S: futures_core::stream::Stream,
F: FnMut(S::Item) -> Option<B>,
{
type Item = B;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match (self.as_mut().f())(v) {
Some(b) => Poll::Ready(Some(b)),
None => {
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

@ -23,6 +23,7 @@
mod all;
mod any;
mod filter_map;
mod min_by;
mod next;
mod nth;
@ -32,6 +33,7 @@ pub use take::Take;
use all::AllFuture;
use any::AnyFuture;
use filter_map::FilterMap;
use min_by::MinByFuture;
use next::NextFuture;
use nth::NthFuture;
@ -130,6 +132,43 @@ pub trait Stream {
}
}
/// Both filters and maps a stream.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect();
///
/// let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
///
/// let one = parsed.next().await;
/// assert_eq!(one, Some(1));
///
/// let three = parsed.next().await;
/// assert_eq!(three, Some(3));
///
/// let five = parsed.next().await;
/// assert_eq!(five, Some(5));
///
/// let end = parsed.next().await;
/// assert_eq!(end, None);
/// #
/// # }) }
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B>
where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
{
FilterMap::new(self, f)
}
/// Returns the element that gives the minimum value with respect to the
/// specified comparison function. If several elements are equally minimum,
/// the first element is returned. If the stream is empty, `None` is returned.

Loading…
Cancel
Save