diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 6623207b..e2dc856b 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -27,6 +27,7 @@ use cfg_if::cfg_if; use crate::future::Future; use crate::task::{Context, Poll}; +use std::marker::PhantomData; cfg_if! { if #[cfg(feature = "docs")] { @@ -111,6 +112,63 @@ pub trait Stream { remaining: n, } } + + /// Tests if every element of the stream matches a predicate. + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all()`. If any of them return `false`, it + /// returns `false`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat::(42).take(3); + /// assert!(s.all(|x| x == 42).await); + /// + /// # + /// # }) } + /// ``` + /// + /// Empty stream: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::empty::(); + /// assert!(s.all(|_| false).await); + /// # + /// # }) } + /// ``` + #[inline] + fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> + where + Self: Sized, + F: FnMut(Self::Item) -> bool, + { + AllFuture { + stream: self, + result: true, + __item: PhantomData, + f, + } + } } impl Stream for T { @@ -168,3 +226,50 @@ impl futures::Stream for Take { } } } + +#[derive(Debug)] +pub struct AllFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + stream: &'a mut S, + f: F, + result: bool, + __item: PhantomData, +} + +impl<'a, S, F, T> AllFuture<'a, S, F, T> +where + F: FnMut(T) -> bool, +{ + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(result: bool); + pin_utils::unsafe_unpinned!(f: F); +} + +impl Future for AllFuture<'_, S, F, S::Item> +where + S: futures::Stream + Unpin + Sized, + F: FnMut(S::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures::Stream; + let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let result = (self.as_mut().f())(v); + *self.as_mut().result() = result; + if result { + // don't forget to wake this task again to pull the next item from stream + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(false) + } + } + None => Poll::Ready(self.result), + } + } +}