From 97a5f9b50c95e49639bf999227229094afeb37b0 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 10 Sep 2019 23:38:11 +0300 Subject: [PATCH] adds stream::find combinator --- src/stream/stream/find.rs | 49 +++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 46 ++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 src/stream/stream/find.rs diff --git a/src/stream/stream/find.rs b/src/stream/stream/find.rs new file mode 100644 index 0000000..dfab089 --- /dev/null +++ b/src/stream/stream/find.rs @@ -0,0 +1,49 @@ +use crate::task::{Context, Poll}; +use std::marker::PhantomData; +use std::pin::Pin; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct FindFuture<'a, S, P, T> { + stream: &'a mut S, + p: P, + __t: PhantomData, +} + +impl<'a, S, P, T> FindFuture<'a, S, P, T> { + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(p: P); + + pub(super) fn new(stream: &'a mut S, p: P) -> Self { + FindFuture { + stream, + p, + __t: PhantomData, + } + } +} + +impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item> +where + S: futures_core::stream::Stream + Unpin + Sized, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_core::stream::Stream; + + let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match item { + Some(v) => match (self.as_mut().p())(&v) { + true => Poll::Ready(Some(v)), + false => { + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index eddafe2..462aeae 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod filter_map; +mod find; mod find_map; mod min_by; mod next; @@ -35,6 +36,7 @@ pub use take::Take; use all::AllFuture; use any::AnyFuture; use filter_map::FilterMap; +use find::FindFuture; use find_map::FindMapFuture; use min_by::MinByFuture; use next::NextFuture; @@ -321,6 +323,50 @@ pub trait Stream { } } + /// Searches for an element in a stream that satisfies a predicate. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let res = s.find(|x| *x == 2).await; + /// assert_eq!(res, Some(2)); + /// # + /// # }) } + /// ``` + /// + /// Resuming after a first find: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let res = s.find(|x| *x == 2).await; + /// assert_eq!(res, Some(2)); + /// + /// let next = s.next().await; + /// assert_eq!(next, Some(3)); + /// # + /// # }) } + /// ``` + fn find

(&mut self, p: P) -> ret!('_, FindFuture, Option, P, Self::Item) + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + FindFuture::new(self, p) + } + /// Applies function to the elements of stream and returns the first non-none result. /// /// ```