From bff10fe83b62d112d10e7748d78468f5b31ec972 Mon Sep 17 00:00:00 2001 From: Shady Khalifa Date: Sun, 1 Sep 2019 19:58:16 +0200 Subject: [PATCH] Stream::any implementation (#135) * add stream::any method * use `ret` macro and small improvements * fix docs return type in `ret` macro --- src/stream/stream.rs | 117 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 4 deletions(-) diff --git a/src/stream/stream.rs b/src/stream/stream.rs index e2dc856..7a127b0 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -36,10 +36,15 @@ cfg_if! { macro_rules! ret { ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + ($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>); + ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>); + } } else { macro_rules! ret { ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + ($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>); + ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>); } } } @@ -81,7 +86,7 @@ pub trait Stream { /// # /// # }) } /// ``` - fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option) + fn next(&mut self) -> ret!('_, NextFuture, Option) where Self: Unpin; @@ -157,14 +162,71 @@ pub trait Stream { /// # }) } /// ``` #[inline] - fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> + fn all(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item) where Self: Sized, F: FnMut(Self::Item) -> bool, { AllFuture { stream: self, - result: true, + result: true, // the default if the empty stream + __item: PhantomData, + f, + } + } + + /// Tests if any element of the stream matches a predicate. + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat::(42).take(3); + /// assert!(s.any(|x| x == 42).await); + /// + /// # + /// # }) } + /// ``` + /// + /// Empty stream: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::empty::(); + /// assert!(!s.any(|_| false).await); + /// # + /// # }) } + /// ``` + #[inline] + fn any(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item) + where + Self: Sized, + F: FnMut(Self::Item) -> bool, + { + AnyFuture { + stream: self, + result: false, // the default if the empty stream __item: PhantomData, f, } @@ -174,7 +236,7 @@ pub trait Stream { impl Stream for T { type Item = ::Item; - fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option) + fn next(&mut self) -> ret!('_, NextFuture, Option) where Self: Unpin, { @@ -273,3 +335,50 @@ where } } } + +#[derive(Debug)] +pub struct AnyFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + stream: &'a mut S, + f: F, + result: bool, + __item: PhantomData, +} + +impl<'a, S, F, T> AnyFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(result: bool); + pin_utils::unsafe_unpinned!(f: F); +} + +impl Future for AnyFuture<'_, S, F, S::Item> +where + S: futures::Stream + Unpin + Sized, + F: FnMut(S::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures::Stream; + let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let result = (self.as_mut().f())(v); + *self.as_mut().result() = result; + if result { + Poll::Ready(true) + } else { + // don't forget to wake this task again to pull the next item from stream + cx.waker().wake_by_ref(); + Poll::Pending + } + } + None => Poll::Ready(self.result), + } + } +}